[spark] Git Push Summary

2017-10-26 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/test2.2 [deleted] cb54f297a

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22356][SQL] data source table should support overlapped columns between data and partition schema

2017-10-26 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 2839280ad -> cb54f297a


[SPARK-22356][SQL] data source table should support overlapped columns between 
data and partition schema

This is a regression introduced by #14207. After Spark 2.1, we store the 
inferred schema when creating the table, to avoid inferring schema again at 
read path. However, there is one special case: overlapped columns between data 
and partition. For this case, it breaks the assumption of table schema that 
there is on ovelap between data and partition schema, and partition columns 
should be at the end. The result is, for Spark 2.1, the table scan has 
incorrect schema that puts partition columns at the end. For Spark 2.2, we add 
a check in CatalogTable to validate table schema, which fails at this case.

To fix this issue, a simple and safe approach is to fallback to old behavior 
when overlapeed columns detected, i.e. store empty schema in metastore.

new regression test

Author: Wenchen Fan 

Closes #19579 from cloud-fan/bug2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb54f297
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb54f297
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb54f297

Branch: refs/heads/branch-2.2
Commit: cb54f297ae52690e6162b2bab9a3940d38ff82f2
Parents: 2839280
Author: Wenchen Fan 
Authored: Thu Oct 26 17:39:53 2017 -0700
Committer: gatorsmile 
Committed: Thu Oct 26 17:56:29 2017 -0700

--
 .../command/createDataSourceTables.scala| 35 +-
 .../datasources/HadoopFsRelation.scala  | 25 -
 .../org/apache/spark/sql/SQLQuerySuite.scala| 16 +
 .../hive/HiveExternalCatalogVersionsSuite.scala | 38 +++-
 4 files changed, 89 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cb54f297/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 2d89011..d05af89 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
 
 /**
  * A command used to create a data source table.
@@ -87,14 +88,32 @@ case class CreateDataSourceTableCommand(table: 
CatalogTable, ignoreIfExists: Boo
   }
 }
 
-val newTable = table.copy(
-  schema = dataSource.schema,
-  partitionColumnNames = partitionColumnNames,
-  // If metastore partition management for file source tables is enabled, 
we start off with
-  // partition provider hive, but no partitions in the metastore. The user 
has to call
-  // `msck repair table` to populate the table partitions.
-  tracksPartitionsInCatalog = partitionColumnNames.nonEmpty &&
-sessionState.conf.manageFilesourcePartitions)
+val newTable = dataSource match {
+  // Since Spark 2.1, we store the inferred schema of data source in 
metastore, to avoid
+  // inferring the schema again at read path. However if the data source 
has overlapped columns
+  // between data and partition schema, we can't store it in metastore as 
it breaks the
+  // assumption of table schema. Here we fallback to the behavior of Spark 
prior to 2.1, store
+  // empty schema in metastore and infer it at runtime. Note that this 
also means the new
+  // scalable partitioning handling feature(introduced at Spark 2.1) is 
disabled in this case.
+  case r: HadoopFsRelation if r.overlappedPartCols.nonEmpty =>
+logWarning("It is not recommended to create a table with overlapped 
data and partition " +
+  "columns, as Spark cannot store a valid table schema and has to 
infer it at runtime, " +
+  "which hurts performance. Please check your data files and remove 
the partition " +
+  "columns in it.")
+table.copy(schema = new StructType(), partitionColumnNames = Nil)
+
+  case _ =>
+table.copy(
+  schema = dataSource.schema,
+  partitionColumnNames = partitionColumnNames,
+  // If metastore partition management for file source tables is 
enabled, we start 

spark git commit: [SPARK-22356][SQL] data source table should support overlapped columns between data and partition schema

2017-10-26 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/test2.2 [created] cb54f297a


[SPARK-22356][SQL] data source table should support overlapped columns between 
data and partition schema

This is a regression introduced by #14207. After Spark 2.1, we store the 
inferred schema when creating the table, to avoid inferring schema again at 
read path. However, there is one special case: overlapped columns between data 
and partition. For this case, it breaks the assumption of table schema that 
there is on ovelap between data and partition schema, and partition columns 
should be at the end. The result is, for Spark 2.1, the table scan has 
incorrect schema that puts partition columns at the end. For Spark 2.2, we add 
a check in CatalogTable to validate table schema, which fails at this case.

To fix this issue, a simple and safe approach is to fallback to old behavior 
when overlapeed columns detected, i.e. store empty schema in metastore.

new regression test

Author: Wenchen Fan 

Closes #19579 from cloud-fan/bug2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb54f297
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb54f297
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb54f297

Branch: refs/heads/test2.2
Commit: cb54f297ae52690e6162b2bab9a3940d38ff82f2
Parents: 2839280
Author: Wenchen Fan 
Authored: Thu Oct 26 17:39:53 2017 -0700
Committer: gatorsmile 
Committed: Thu Oct 26 17:56:29 2017 -0700

--
 .../command/createDataSourceTables.scala| 35 +-
 .../datasources/HadoopFsRelation.scala  | 25 -
 .../org/apache/spark/sql/SQLQuerySuite.scala| 16 +
 .../hive/HiveExternalCatalogVersionsSuite.scala | 38 +++-
 4 files changed, 89 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cb54f297/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 2d89011..d05af89 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
 
 /**
  * A command used to create a data source table.
@@ -87,14 +88,32 @@ case class CreateDataSourceTableCommand(table: 
CatalogTable, ignoreIfExists: Boo
   }
 }
 
-val newTable = table.copy(
-  schema = dataSource.schema,
-  partitionColumnNames = partitionColumnNames,
-  // If metastore partition management for file source tables is enabled, 
we start off with
-  // partition provider hive, but no partitions in the metastore. The user 
has to call
-  // `msck repair table` to populate the table partitions.
-  tracksPartitionsInCatalog = partitionColumnNames.nonEmpty &&
-sessionState.conf.manageFilesourcePartitions)
+val newTable = dataSource match {
+  // Since Spark 2.1, we store the inferred schema of data source in 
metastore, to avoid
+  // inferring the schema again at read path. However if the data source 
has overlapped columns
+  // between data and partition schema, we can't store it in metastore as 
it breaks the
+  // assumption of table schema. Here we fallback to the behavior of Spark 
prior to 2.1, store
+  // empty schema in metastore and infer it at runtime. Note that this 
also means the new
+  // scalable partitioning handling feature(introduced at Spark 2.1) is 
disabled in this case.
+  case r: HadoopFsRelation if r.overlappedPartCols.nonEmpty =>
+logWarning("It is not recommended to create a table with overlapped 
data and partition " +
+  "columns, as Spark cannot store a valid table schema and has to 
infer it at runtime, " +
+  "which hurts performance. Please check your data files and remove 
the partition " +
+  "columns in it.")
+table.copy(schema = new StructType(), partitionColumnNames = Nil)
+
+  case _ =>
+table.copy(
+  schema = dataSource.schema,
+  partitionColumnNames = partitionColumnNames,
+  // If metastore partition management for file source tables is 
enabled, we start off with
+  

spark git commit: [SPARK-22355][SQL] Dataset.collect is not threadsafe

2017-10-26 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 a607ddc52 -> 2839280ad


[SPARK-22355][SQL] Dataset.collect is not threadsafe

It's possible that users create a `Dataset`, and call `collect` of this 
`Dataset` in many threads at the same time. Currently `Dataset#collect` just 
call `encoder.fromRow` to convert spark rows to objects of type T, and this 
encoder is per-dataset. This means `Dataset#collect` is not thread-safe, 
because the encoder uses a projection to output the object to a re-usable row.

This PR fixes this problem, by creating a new projection when calling 
`Dataset#collect`, so that we have the re-usable row for each method call, 
instead of each Dataset.

N/A

Author: Wenchen Fan 

Closes #19577 from cloud-fan/encoder.

(cherry picked from commit 5c3a1f3fad695317c2fff1243cdb9b3ceb25c317)
Signed-off-by: gatorsmile 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2839280a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2839280a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2839280a

Branch: refs/heads/branch-2.2
Commit: 2839280adc930593c64a74892fec79dcc666d468
Parents: a607ddc
Author: Wenchen Fan 
Authored: Thu Oct 26 17:51:16 2017 -0700
Committer: gatorsmile 
Committed: Thu Oct 26 17:52:26 2017 -0700

--
 .../scala/org/apache/spark/sql/Dataset.scala| 33 +---
 1 file changed, 22 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2839280a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index a775fb8..1acbad9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection
 import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.parser.ParseException
@@ -195,15 +196,10 @@ class Dataset[T] private[sql](
*/
   private[sql] implicit val exprEnc: ExpressionEncoder[T] = encoderFor(encoder)
 
-  /**
-   * Encoder is used mostly as a container of serde expressions in Dataset.  
We build logical
-   * plans by these serde expressions and execute it within the query 
framework.  However, for
-   * performance reasons we may want to use encoder as a function to 
deserialize internal rows to
-   * custom objects, e.g. collect.  Here we resolve and bind the encoder so 
that we can call its
-   * `fromRow` method later.
-   */
-  private val boundEnc =
-exprEnc.resolveAndBind(logicalPlan.output, 
sparkSession.sessionState.analyzer)
+  // The deserializer expression which can be used to build a projection and 
turn rows to objects
+  // of type T, after collecting rows to the driver side.
+  private val deserializer =
+exprEnc.resolveAndBind(logicalPlan.output, 
sparkSession.sessionState.analyzer).deserializer
 
   private implicit def classTag = exprEnc.clsTag
 
@@ -2418,7 +2414,15 @@ class Dataset[T] private[sql](
*/
   def toLocalIterator(): java.util.Iterator[T] = {
 withAction("toLocalIterator", queryExecution) { plan =>
-  plan.executeToIterator().map(boundEnc.fromRow).asJava
+  // This projection writes output to a `InternalRow`, which means 
applying this projection is
+  // not thread-safe. Here we create the projection inside this method to 
make `Dataset`
+  // thread-safe.
+  val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
+  plan.executeToIterator().map { row =>
+// The row returned by SafeProjection is `SpecificInternalRow`, which 
ignore the data type
+// parameter of its `get` method, so it's safe to use null here.
+objProj(row).get(0, null).asInstanceOf[T]
+  }.asJava
 }
   }
 
@@ -2851,7 +2855,14 @@ class Dataset[T] private[sql](
* Collect all elements from a spark plan.
*/
   private def collectFromPlan(plan: SparkPlan): Array[T] = {
-plan.executeCollect().map(boundEnc.fromRow)
+// This projection writes output to a `InternalRow`, which means applying 
this projection is not
+// thread-safe. Here we create the projection inside this method to make 
`Dataset` thread-safe.
+

spark git commit: [SPARK-22355][SQL] Dataset.collect is not threadsafe

2017-10-26 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 9b262f6a0 -> 5c3a1f3fa


[SPARK-22355][SQL] Dataset.collect is not threadsafe

## What changes were proposed in this pull request?

It's possible that users create a `Dataset`, and call `collect` of this 
`Dataset` in many threads at the same time. Currently `Dataset#collect` just 
call `encoder.fromRow` to convert spark rows to objects of type T, and this 
encoder is per-dataset. This means `Dataset#collect` is not thread-safe, 
because the encoder uses a projection to output the object to a re-usable row.

This PR fixes this problem, by creating a new projection when calling 
`Dataset#collect`, so that we have the re-usable row for each method call, 
instead of each Dataset.

## How was this patch tested?

N/A

Author: Wenchen Fan 

Closes #19577 from cloud-fan/encoder.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c3a1f3f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c3a1f3f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c3a1f3f

Branch: refs/heads/master
Commit: 5c3a1f3fad695317c2fff1243cdb9b3ceb25c317
Parents: 9b262f6
Author: Wenchen Fan 
Authored: Thu Oct 26 17:51:16 2017 -0700
Committer: gatorsmile 
Committed: Thu Oct 26 17:51:16 2017 -0700

--
 .../scala/org/apache/spark/sql/Dataset.scala| 33 +---
 1 file changed, 22 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5c3a1f3f/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index b70dfc0..0e23983 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection
 import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions}
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils}
@@ -198,15 +199,10 @@ class Dataset[T] private[sql](
*/
   private[sql] implicit val exprEnc: ExpressionEncoder[T] = encoderFor(encoder)
 
-  /**
-   * Encoder is used mostly as a container of serde expressions in Dataset.  
We build logical
-   * plans by these serde expressions and execute it within the query 
framework.  However, for
-   * performance reasons we may want to use encoder as a function to 
deserialize internal rows to
-   * custom objects, e.g. collect.  Here we resolve and bind the encoder so 
that we can call its
-   * `fromRow` method later.
-   */
-  private val boundEnc =
-exprEnc.resolveAndBind(logicalPlan.output, 
sparkSession.sessionState.analyzer)
+  // The deserializer expression which can be used to build a projection and 
turn rows to objects
+  // of type T, after collecting rows to the driver side.
+  private val deserializer =
+exprEnc.resolveAndBind(logicalPlan.output, 
sparkSession.sessionState.analyzer).deserializer
 
   private implicit def classTag = exprEnc.clsTag
 
@@ -2661,7 +2657,15 @@ class Dataset[T] private[sql](
*/
   def toLocalIterator(): java.util.Iterator[T] = {
 withAction("toLocalIterator", queryExecution) { plan =>
-  plan.executeToIterator().map(boundEnc.fromRow).asJava
+  // This projection writes output to a `InternalRow`, which means 
applying this projection is
+  // not thread-safe. Here we create the projection inside this method to 
make `Dataset`
+  // thread-safe.
+  val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
+  plan.executeToIterator().map { row =>
+// The row returned by SafeProjection is `SpecificInternalRow`, which 
ignore the data type
+// parameter of its `get` method, so it's safe to use null here.
+objProj(row).get(0, null).asInstanceOf[T]
+  }.asJava
 }
   }
 
@@ -3102,7 +3106,14 @@ class Dataset[T] private[sql](
* Collect all elements from a spark plan.
*/
   private def collectFromPlan(plan: SparkPlan): Array[T] = {
-plan.executeCollect().map(boundEnc.fromRow)
+// This projection writes output to a `InternalRow`, which means applying 
this projection is not
+// thread-safe. Here we create the projection inside this method to make 
`Dataset` thread-safe.
+val objProj = 

spark git commit: [SPARK-22356][SQL] data source table should support overlapped columns between data and partition schema

2017-10-26 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 8e9863531 -> 9b262f6a0


[SPARK-22356][SQL] data source table should support overlapped columns between 
data and partition schema

## What changes were proposed in this pull request?

This is a regression introduced by #14207. After Spark 2.1, we store the 
inferred schema when creating the table, to avoid inferring schema again at 
read path. However, there is one special case: overlapped columns between data 
and partition. For this case, it breaks the assumption of table schema that 
there is on ovelap between data and partition schema, and partition columns 
should be at the end. The result is, for Spark 2.1, the table scan has 
incorrect schema that puts partition columns at the end. For Spark 2.2, we add 
a check in CatalogTable to validate table schema, which fails at this case.

To fix this issue, a simple and safe approach is to fallback to old behavior 
when overlapeed columns detected, i.e. store empty schema in metastore.

## How was this patch tested?

new regression test

Author: Wenchen Fan 

Closes #19579 from cloud-fan/bug2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b262f6a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b262f6a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b262f6a

Branch: refs/heads/master
Commit: 9b262f6a08c0c1b474d920d49b9fdd574c401d39
Parents: 8e98635
Author: Wenchen Fan 
Authored: Thu Oct 26 17:39:53 2017 -0700
Committer: gatorsmile 
Committed: Thu Oct 26 17:39:53 2017 -0700

--
 .../command/createDataSourceTables.scala| 35 +-
 .../datasources/HadoopFsRelation.scala  | 25 -
 .../org/apache/spark/sql/SQLQuerySuite.scala| 16 +
 .../hive/HiveExternalCatalogVersionsSuite.scala | 38 +++-
 4 files changed, 89 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9b262f6a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 9e39079..306f43d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
 
 /**
  * A command used to create a data source table.
@@ -85,14 +86,32 @@ case class CreateDataSourceTableCommand(table: 
CatalogTable, ignoreIfExists: Boo
   }
 }
 
-val newTable = table.copy(
-  schema = dataSource.schema,
-  partitionColumnNames = partitionColumnNames,
-  // If metastore partition management for file source tables is enabled, 
we start off with
-  // partition provider hive, but no partitions in the metastore. The user 
has to call
-  // `msck repair table` to populate the table partitions.
-  tracksPartitionsInCatalog = partitionColumnNames.nonEmpty &&
-sessionState.conf.manageFilesourcePartitions)
+val newTable = dataSource match {
+  // Since Spark 2.1, we store the inferred schema of data source in 
metastore, to avoid
+  // inferring the schema again at read path. However if the data source 
has overlapped columns
+  // between data and partition schema, we can't store it in metastore as 
it breaks the
+  // assumption of table schema. Here we fallback to the behavior of Spark 
prior to 2.1, store
+  // empty schema in metastore and infer it at runtime. Note that this 
also means the new
+  // scalable partitioning handling feature(introduced at Spark 2.1) is 
disabled in this case.
+  case r: HadoopFsRelation if r.overlappedPartCols.nonEmpty =>
+logWarning("It is not recommended to create a table with overlapped 
data and partition " +
+  "columns, as Spark cannot store a valid table schema and has to 
infer it at runtime, " +
+  "which hurts performance. Please check your data files and remove 
the partition " +
+  "columns in it.")
+table.copy(schema = new StructType(), partitionColumnNames = Nil)
+
+  case _ =>
+table.copy(
+  schema = dataSource.schema,
+  partitionColumnNames = partitionColumnNames,
+  // If 

spark git commit: [SPARK-22366] Support ignoring missing files

2017-10-26 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 5415963d2 -> 8e9863531


[SPARK-22366] Support ignoring missing files

## What changes were proposed in this pull request?

Add a flag "spark.sql.files.ignoreMissingFiles" to parallel the existing flag 
"spark.sql.files.ignoreCorruptFiles".

## How was this patch tested?

new unit test

Author: Jose Torres 

Closes #19581 from joseph-torres/SPARK-22366.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8e986353
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8e986353
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8e986353

Branch: refs/heads/master
Commit: 8e9863531bebbd4d83eafcbc2b359b8bd0ac5734
Parents: 5415963
Author: Jose Torres 
Authored: Thu Oct 26 16:55:30 2017 -0700
Committer: Shixiong Zhu 
Committed: Thu Oct 26 16:55:30 2017 -0700

--
 .../org/apache/spark/sql/internal/SQLConf.scala |  8 +
 .../sql/execution/datasources/FileScanRDD.scala | 13 +---
 .../datasources/parquet/ParquetQuerySuite.scala | 33 
 3 files changed, 50 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8e986353/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 4cfe53b..21e4685 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -614,6 +614,12 @@ object SQLConf {
 .booleanConf
 .createWithDefault(false)
 
+  val IGNORE_MISSING_FILES = buildConf("spark.sql.files.ignoreMissingFiles")
+.doc("Whether to ignore missing files. If true, the Spark jobs will 
continue to run when " +
+  "encountering missing files and the contents that have been read will 
still be returned.")
+.booleanConf
+.createWithDefault(false)
+
   val MAX_RECORDS_PER_FILE = buildConf("spark.sql.files.maxRecordsPerFile")
 .doc("Maximum number of records to write out to a single file. " +
   "If this value is zero or negative, there is no limit.")
@@ -1014,6 +1020,8 @@ class SQLConf extends Serializable with Logging {
 
   def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES)
 
+  def ignoreMissingFiles: Boolean = getConf(IGNORE_MISSING_FILES)
+
   def maxRecordsPerFile: Long = getConf(MAX_RECORDS_PER_FILE)
 
   def useCompression: Boolean = getConf(COMPRESS_CACHED)

http://git-wip-us.apache.org/repos/asf/spark/blob/8e986353/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 9df2073..8731ee8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -66,6 +66,7 @@ class FileScanRDD(
   extends RDD[InternalRow](sparkSession.sparkContext, Nil) {
 
   private val ignoreCorruptFiles = 
sparkSession.sessionState.conf.ignoreCorruptFiles
+  private val ignoreMissingFiles = 
sparkSession.sessionState.conf.ignoreMissingFiles
 
   override def compute(split: RDDPartition, context: TaskContext): 
Iterator[InternalRow] = {
 val iterator = new Iterator[Object] with AutoCloseable {
@@ -142,7 +143,7 @@ class FileScanRDD(
   // Sets InputFileBlockHolder for the file block's information
   InputFileBlockHolder.set(currentFile.filePath, currentFile.start, 
currentFile.length)
 
-  if (ignoreCorruptFiles) {
+  if (ignoreMissingFiles || ignoreCorruptFiles) {
 currentIterator = new NextIterator[Object] {
   // The readFunction may read some bytes before consuming the 
iterator, e.g.,
   // vectorized Parquet reader. Here we use lazy val to delay the 
creation of
@@ -158,9 +159,13 @@ class FileScanRDD(
 null
   }
 } catch {
-  // Throw FileNotFoundException even `ignoreCorruptFiles` is 
true
-  case e: FileNotFoundException => throw e
-  case e @ (_: RuntimeException | _: IOException) =>
+  case e: FileNotFoundException if ignoreMissingFiles =>
+logWarning(s"Skipped missing file: $currentFile", e)
+finished = true
+   

spark git commit: [SPARK-22131][MESOS] Mesos driver secrets

2017-10-26 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 4f8dc6b01 -> 5415963d2


[SPARK-22131][MESOS] Mesos driver secrets

## Background

In #18837 , ArtRand added Mesos secrets support to the dispatcher. **This PR is 
to add the same secrets support to the drivers.** This means if the secret 
configs are set, the driver will launch executors that have access to either 
env or file-based secrets.

One use case for this is to support TLS in the driver <=> executor 
communication.

## What changes were proposed in this pull request?

Most of the changes are a refactor of the dispatcher secrets support (#18837) - 
moving it to a common place that can be used by both the dispatcher and 
drivers. The same goes for the unit tests.

## How was this patch tested?

There are four config combinations: [env or file-based] x [value or reference 
secret]. For each combination:
- Added a unit test.
- Tested in DC/OS.

Author: Susan X. Huynh 

Closes #19437 from susanxhuynh/sh-mesos-driver-secret.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5415963d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5415963d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5415963d

Branch: refs/heads/master
Commit: 5415963d2caaf95604211419ffc4e29fff38e1d7
Parents: 4f8dc6b
Author: Susan X. Huynh 
Authored: Thu Oct 26 16:13:48 2017 -0700
Committer: Marcelo Vanzin 
Committed: Thu Oct 26 16:13:48 2017 -0700

--
 docs/running-on-mesos.md| 111 +++---
 .../org/apache/spark/deploy/mesos/config.scala  |  64 
 .../cluster/mesos/MesosClusterScheduler.scala   | 138 -
 .../MesosCoarseGrainedSchedulerBackend.scala|  31 +++-
 .../MesosFineGrainedSchedulerBackend.scala  |   4 +-
 .../mesos/MesosSchedulerBackendUtil.scala   |  92 +++-
 .../mesos/MesosClusterSchedulerSuite.scala  | 150 +++
 ...esosCoarseGrainedSchedulerBackendSuite.scala |  34 -
 .../mesos/MesosSchedulerBackendUtilSuite.scala  |   7 +-
 .../spark/scheduler/cluster/mesos/Utils.scala   | 107 +
 10 files changed, 434 insertions(+), 304 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5415963d/docs/running-on-mesos.md
--
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index e0944bc..b7e3e64 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -485,39 +485,106 @@ See the [configuration page](configuration.html) for 
information on Spark config
 
 
 
-  spark.mesos.driver.secret.envkeys
-  (none)
   
-A comma-separated list that, if set, the contents of the secret referenced
-by spark.mesos.driver.secret.names or spark.mesos.driver.secret.values 
will be
-set to the provided environment variable in the driver's process.
+spark.mesos.driver.secret.values,
+spark.mesos.driver.secret.names,
+spark.mesos.executor.secret.values,
+spark.mesos.executor.secret.names,
   
-  
-  
-spark.mesos.driver.secret.filenames
   (none)
   
-A comma-separated list that, if set, the contents of the secret referenced 
by
-spark.mesos.driver.secret.names or spark.mesos.driver.secret.values will be
-written to the provided file. Paths are relative to the container's work
-directory.  Absolute paths must already exist.  Consult the Mesos Secret
-protobuf for more information.
+
+  A secret is specified by its contents and destination. These properties
+  specify a secret's contents. To specify a secret's destination, see the 
cell below.
+
+
+  You can specify a secret's contents either (1) by value or (2) by 
reference.
+
+
+  (1) To specify a secret by value, set the
+  spark.mesos.[driver|executor].secret.values
+  property, to make the secret available in the driver or executors.
+  For example, to make a secret password "guessme" available to the driver 
process, set:
+
+  spark.mesos.driver.secret.values=guessme
+
+
+  (2) To specify a secret that has been placed in a secret store
+  by reference, specify its name within the secret store
+  by setting the spark.mesos.[driver|executor].secret.names
+  property. For example, to make a secret password named "password" in a 
secret store
+  available to the driver process, set:
+
+  spark.mesos.driver.secret.names=password
+
+
+  Note: To use a secret store, make sure one has been integrated with 
Mesos via a custom
+  http://mesos.apache.org/documentation/latest/secrets/;>SecretResolver
+  module.
+
+
+  To specify multiple secrets, provide a comma-separated list:
+
+  

spark git commit: [SPARK-22328][CORE] ClosureCleaner should not miss referenced superclass fields

2017-10-26 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 24fe7ccba -> a607ddc52


[SPARK-22328][CORE] ClosureCleaner should not miss referenced superclass fields

When the given closure uses some fields defined in super class, 
`ClosureCleaner` can't figure them and don't set it properly. Those fields will 
be in null values.

Added test.

Author: Liang-Chi Hsieh 

Closes #19556 from viirya/SPARK-22328.

(cherry picked from commit 4f8dc6b01ea787243a38678ea8199fbb0814cffc)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a607ddc5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a607ddc5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a607ddc5

Branch: refs/heads/branch-2.2
Commit: a607ddc52e933151327f9b097a453eff38fcf748
Parents: 24fe7cc
Author: Liang-Chi Hsieh 
Authored: Thu Oct 26 21:41:45 2017 +0100
Committer: Wenchen Fan 
Committed: Thu Oct 26 21:44:17 2017 +0100

--
 .../org/apache/spark/util/ClosureCleaner.scala  | 73 
 .../apache/spark/util/ClosureCleanerSuite.scala | 72 +++
 2 files changed, 133 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a607ddc5/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala 
b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 489688c..2d5d3f8 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -91,6 +91,54 @@ private[spark] object ClosureCleaner extends Logging {
 (seen - obj.getClass).toList
   }
 
+  /** Initializes the accessed fields for outer classes and their super 
classes. */
+  private def initAccessedFields(
+  accessedFields: Map[Class[_], Set[String]],
+  outerClasses: Seq[Class[_]]): Unit = {
+for (cls <- outerClasses) {
+  var currentClass = cls
+  assert(currentClass != null, "The outer class can't be null.")
+
+  while (currentClass != null) {
+accessedFields(currentClass) = Set.empty[String]
+currentClass = currentClass.getSuperclass()
+  }
+}
+  }
+
+  /** Sets accessed fields for given class in clone object based on given 
object. */
+  private def setAccessedFields(
+  outerClass: Class[_],
+  clone: AnyRef,
+  obj: AnyRef,
+  accessedFields: Map[Class[_], Set[String]]): Unit = {
+for (fieldName <- accessedFields(outerClass)) {
+  val field = outerClass.getDeclaredField(fieldName)
+  field.setAccessible(true)
+  val value = field.get(obj)
+  field.set(clone, value)
+}
+  }
+
+  /** Clones a given object and sets accessed fields in cloned object. */
+  private def cloneAndSetFields(
+  parent: AnyRef,
+  obj: AnyRef,
+  outerClass: Class[_],
+  accessedFields: Map[Class[_], Set[String]]): AnyRef = {
+val clone = instantiateClass(outerClass, parent)
+
+var currentClass = outerClass
+assert(currentClass != null, "The outer class can't be null.")
+
+while (currentClass != null) {
+  setAccessedFields(currentClass, clone, obj, accessedFields)
+  currentClass = currentClass.getSuperclass()
+}
+
+clone
+  }
+
   /**
* Clean the given closure in place.
*
@@ -200,9 +248,8 @@ private[spark] object ClosureCleaner extends Logging {
   logDebug(s" + populating accessed fields because this is the starting 
closure")
   // Initialize accessed fields with the outer classes first
   // This step is needed to associate the fields to the correct classes 
later
-  for (cls <- outerClasses) {
-accessedFields(cls) = Set[String]()
-  }
+  initAccessedFields(accessedFields, outerClasses)
+
   // Populate accessed fields by visiting all fields and methods accessed 
by this and
   // all of its inner closures. If transitive cleaning is enabled, this 
may recursively
   // visits methods that belong to other classes in search of transitively 
referenced fields.
@@ -248,13 +295,8 @@ private[spark] object ClosureCleaner extends Logging {
   // required fields from the original object. We need the parent here 
because the Java
   // language specification requires the first constructor parameter of 
any closure to be
   // its enclosing object.
-  val clone = instantiateClass(cls, parent)
-  for (fieldName <- accessedFields(cls)) {
-val field = cls.getDeclaredField(fieldName)
-field.setAccessible(true)
-val value = field.get(obj)
-field.set(clone, value)
-  }
+ 

spark git commit: [SPARK-22328][CORE] ClosureCleaner should not miss referenced superclass fields

2017-10-26 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 0e9a750a8 -> 4f8dc6b01


[SPARK-22328][CORE] ClosureCleaner should not miss referenced superclass fields

## What changes were proposed in this pull request?

When the given closure uses some fields defined in super class, 
`ClosureCleaner` can't figure them and don't set it properly. Those fields will 
be in null values.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh 

Closes #19556 from viirya/SPARK-22328.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f8dc6b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f8dc6b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f8dc6b0

Branch: refs/heads/master
Commit: 4f8dc6b01ea787243a38678ea8199fbb0814cffc
Parents: 0e9a750
Author: Liang-Chi Hsieh 
Authored: Thu Oct 26 21:41:45 2017 +0100
Committer: Wenchen Fan 
Committed: Thu Oct 26 21:41:45 2017 +0100

--
 .../org/apache/spark/util/ClosureCleaner.scala  | 73 
 .../apache/spark/util/ClosureCleanerSuite.scala | 72 +++
 2 files changed, 133 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4f8dc6b0/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala 
b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 48a1d7b..dfece5d 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -91,6 +91,54 @@ private[spark] object ClosureCleaner extends Logging {
 (seen - obj.getClass).toList
   }
 
+  /** Initializes the accessed fields for outer classes and their super 
classes. */
+  private def initAccessedFields(
+  accessedFields: Map[Class[_], Set[String]],
+  outerClasses: Seq[Class[_]]): Unit = {
+for (cls <- outerClasses) {
+  var currentClass = cls
+  assert(currentClass != null, "The outer class can't be null.")
+
+  while (currentClass != null) {
+accessedFields(currentClass) = Set.empty[String]
+currentClass = currentClass.getSuperclass()
+  }
+}
+  }
+
+  /** Sets accessed fields for given class in clone object based on given 
object. */
+  private def setAccessedFields(
+  outerClass: Class[_],
+  clone: AnyRef,
+  obj: AnyRef,
+  accessedFields: Map[Class[_], Set[String]]): Unit = {
+for (fieldName <- accessedFields(outerClass)) {
+  val field = outerClass.getDeclaredField(fieldName)
+  field.setAccessible(true)
+  val value = field.get(obj)
+  field.set(clone, value)
+}
+  }
+
+  /** Clones a given object and sets accessed fields in cloned object. */
+  private def cloneAndSetFields(
+  parent: AnyRef,
+  obj: AnyRef,
+  outerClass: Class[_],
+  accessedFields: Map[Class[_], Set[String]]): AnyRef = {
+val clone = instantiateClass(outerClass, parent)
+
+var currentClass = outerClass
+assert(currentClass != null, "The outer class can't be null.")
+
+while (currentClass != null) {
+  setAccessedFields(currentClass, clone, obj, accessedFields)
+  currentClass = currentClass.getSuperclass()
+}
+
+clone
+  }
+
   /**
* Clean the given closure in place.
*
@@ -202,9 +250,8 @@ private[spark] object ClosureCleaner extends Logging {
   logDebug(s" + populating accessed fields because this is the starting 
closure")
   // Initialize accessed fields with the outer classes first
   // This step is needed to associate the fields to the correct classes 
later
-  for (cls <- outerClasses) {
-accessedFields(cls) = Set.empty[String]
-  }
+  initAccessedFields(accessedFields, outerClasses)
+
   // Populate accessed fields by visiting all fields and methods accessed 
by this and
   // all of its inner closures. If transitive cleaning is enabled, this 
may recursively
   // visits methods that belong to other classes in search of transitively 
referenced fields.
@@ -250,13 +297,8 @@ private[spark] object ClosureCleaner extends Logging {
   // required fields from the original object. We need the parent here 
because the Java
   // language specification requires the first constructor parameter of 
any closure to be
   // its enclosing object.
-  val clone = instantiateClass(cls, parent)
-  for (fieldName <- accessedFields(cls)) {
-val field = cls.getDeclaredField(fieldName)
-field.setAccessible(true)
-val value = field.get(obj)
-field.set(clone, value)
-  }
+  val clone = cloneAndSetFields(parent, 

spark git commit: [SPARK-20643][CORE] Add listener implementation to collect app state.

2017-10-26 Thread irashid
Repository: spark
Updated Branches:
  refs/heads/master a83d8d5ad -> 0e9a750a8


[SPARK-20643][CORE] Add listener implementation to collect app state.

The initial listener code is based on the existing JobProgressListener (and 
others),
and tries to mimic their behavior as much as possible. The change also includes
some minor code movement so that some types and methods from the initial history
server code code can be reused.

The code introduces a few mutable versions of public API types, used internally,
to make it easier to update information without ugly copy methods, and also to
make certain updates cheaper.

Note the code here is not 100% correct. This is meant as a building ground for
the UI integration in the next milestones. As different parts of the UI are
ported, fixes will be made to the different parts of this code to account
for the needed behavior.

I also added annotations to API types so that Jackson is able to correctly
deserialize options, sequences and maps that store primitive types.

Author: Marcelo Vanzin 

Closes #19383 from vanzin/SPARK-20643.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e9a750a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e9a750a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e9a750a

Branch: refs/heads/master
Commit: 0e9a750a8d389b3a17834584d31c204c77c6970d
Parents: a83d8d5
Author: Marcelo Vanzin 
Authored: Thu Oct 26 11:05:16 2017 -0500
Committer: Imran Rashid 
Committed: Thu Oct 26 11:05:16 2017 -0500

--
 .../apache/spark/util/kvstore/KVTypeInfo.java   |   2 +
 .../org/apache/spark/util/kvstore/LevelDB.java  |   2 +-
 .../apache/spark/status/api/v1/StageStatus.java |   3 +-
 .../deploy/history/FsHistoryProvider.scala  |  37 +-
 .../apache/spark/deploy/history/config.scala|   6 -
 .../apache/spark/status/AppStatusListener.scala | 531 ++
 .../scala/org/apache/spark/status/KVUtils.scala |  73 ++
 .../org/apache/spark/status/LiveEntity.scala| 526 ++
 .../spark/status/api/v1/AllStagesResource.scala |   4 +-
 .../org/apache/spark/status/api/v1/api.scala|  11 +-
 .../org/apache/spark/status/storeTypes.scala|  98 +++
 .../deploy/history/FsHistoryProviderSuite.scala |   2 +-
 .../spark/status/AppStatusListenerSuite.scala   | 690 +++
 project/MimaExcludes.scala  |   2 +
 14 files changed, 1942 insertions(+), 45 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0e9a750a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java
--
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java
index a2b077e..870b484 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java
@@ -46,6 +46,7 @@ public class KVTypeInfo {
   KVIndex idx = f.getAnnotation(KVIndex.class);
   if (idx != null) {
 checkIndex(idx, indices);
+f.setAccessible(true);
 indices.put(idx.value(), idx);
 f.setAccessible(true);
 accessors.put(idx.value(), new FieldAccessor(f));
@@ -58,6 +59,7 @@ public class KVTypeInfo {
 checkIndex(idx, indices);
 Preconditions.checkArgument(m.getParameterTypes().length == 0,
   "Annotated method %s::%s should not have any parameters.", 
type.getName(), m.getName());
+m.setAccessible(true);
 indices.put(idx.value(), idx);
 m.setAccessible(true);
 accessors.put(idx.value(), new MethodAccessor(m));

http://git-wip-us.apache.org/repos/asf/spark/blob/0e9a750a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
--
diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
index ff48b15..4f9e10c 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
@@ -76,7 +76,7 @@ public class LevelDB implements KVStore {
 this.types = new ConcurrentHashMap<>();
 
 Options options = new Options();
-options.createIfMissing(!path.exists());
+options.createIfMissing(true);
 this._db = new AtomicReference<>(JniDBFactory.factory.open(path, options));
 
 byte[] versionData = db().get(STORE_VERSION_KEY);


spark git commit: [SPARK-17902][R] Revive stringsAsFactors option for collect() in SparkR

2017-10-26 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 3e77b7481 -> aa023fddb


[SPARK-17902][R] Revive stringsAsFactors option for collect() in SparkR

## What changes were proposed in this pull request?

This PR proposes to revive `stringsAsFactors` option in collect API, which was 
mistakenly removed in 
https://github.com/apache/spark/commit/71a138cd0e0a14e8426f97877e3b52a562bbd02c.

Simply, it casts `charactor` to `factor` if it meets the condition, 
`stringsAsFactors && is.character(vec)` in primitive type conversion.

## How was this patch tested?

Unit test in `R/pkg/tests/fulltests/test_sparkSQL.R`.

Author: hyukjinkwon 

Closes #19551 from HyukjinKwon/SPARK-17902.

(cherry picked from commit a83d8d5adcb4e0061e43105767242ba9770dda96)
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa023fdd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa023fdd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa023fdd

Branch: refs/heads/branch-2.1
Commit: aa023fddb0abb6cf8ded94ac695ba7b0edb02022
Parents: 3e77b74
Author: hyukjinkwon 
Authored: Thu Oct 26 20:54:36 2017 +0900
Committer: hyukjinkwon 
Committed: Thu Oct 26 20:55:14 2017 +0900

--
 R/pkg/R/DataFrame.R   | 3 +++
 R/pkg/tests/fulltests/test_sparkSQL.R | 6 ++
 2 files changed, 9 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/aa023fdd/R/pkg/R/DataFrame.R
--
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index d0f0979..5899fa8 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1173,6 +1173,9 @@ setMethod("collect",
 vec <- do.call(c, col)
 stopifnot(class(vec) != "list")
 class(vec) <- PRIMITIVE_TYPES[[colType]]
+if (is.character(vec) && stringsAsFactors) {
+  vec <- as.factor(vec)
+}
 df[[colIndex]] <- vec
   } else {
 df[[colIndex]] <- col

http://git-wip-us.apache.org/repos/asf/spark/blob/aa023fdd/R/pkg/tests/fulltests/test_sparkSQL.R
--
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R 
b/R/pkg/tests/fulltests/test_sparkSQL.R
index fedca67..0b88e47 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -417,6 +417,12 @@ test_that("create DataFrame with different data types", {
   expect_equal(collect(df), data.frame(l, stringsAsFactors = FALSE))
 })
 
+test_that("SPARK-17902: collect() with stringsAsFactors enabled", {
+  df <- suppressWarnings(collect(createDataFrame(iris), stringsAsFactors = 
TRUE))
+  expect_equal(class(iris$Species), class(df$Species))
+  expect_equal(iris$Species, df$Species)
+})
+
 test_that("SPARK-17811: can create DataFrame containing NA as date and time", {
   df <- data.frame(
 id = 1:2,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17902][R] Revive stringsAsFactors option for collect() in SparkR

2017-10-26 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 3073344a2 -> a83d8d5ad


[SPARK-17902][R] Revive stringsAsFactors option for collect() in SparkR

## What changes were proposed in this pull request?

This PR proposes to revive `stringsAsFactors` option in collect API, which was 
mistakenly removed in 
https://github.com/apache/spark/commit/71a138cd0e0a14e8426f97877e3b52a562bbd02c.

Simply, it casts `charactor` to `factor` if it meets the condition, 
`stringsAsFactors && is.character(vec)` in primitive type conversion.

## How was this patch tested?

Unit test in `R/pkg/tests/fulltests/test_sparkSQL.R`.

Author: hyukjinkwon 

Closes #19551 from HyukjinKwon/SPARK-17902.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a83d8d5a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a83d8d5a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a83d8d5a

Branch: refs/heads/master
Commit: a83d8d5adcb4e0061e43105767242ba9770dda96
Parents: 3073344
Author: hyukjinkwon 
Authored: Thu Oct 26 20:54:36 2017 +0900
Committer: hyukjinkwon 
Committed: Thu Oct 26 20:54:36 2017 +0900

--
 R/pkg/R/DataFrame.R   | 3 +++
 R/pkg/tests/fulltests/test_sparkSQL.R | 6 ++
 2 files changed, 9 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a83d8d5a/R/pkg/R/DataFrame.R
--
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 176bb3b..aaa3349 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1191,6 +1191,9 @@ setMethod("collect",
 vec <- do.call(c, col)
 stopifnot(class(vec) != "list")
 class(vec) <- PRIMITIVE_TYPES[[colType]]
+if (is.character(vec) && stringsAsFactors) {
+  vec <- as.factor(vec)
+}
 df[[colIndex]] <- vec
   } else {
 df[[colIndex]] <- col

http://git-wip-us.apache.org/repos/asf/spark/blob/a83d8d5a/R/pkg/tests/fulltests/test_sparkSQL.R
--
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R 
b/R/pkg/tests/fulltests/test_sparkSQL.R
index 4382ef2..0c8118a 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -499,6 +499,12 @@ test_that("create DataFrame with different data types", {
   expect_equal(collect(df), data.frame(l, stringsAsFactors = FALSE))
 })
 
+test_that("SPARK-17902: collect() with stringsAsFactors enabled", {
+  df <- suppressWarnings(collect(createDataFrame(iris), stringsAsFactors = 
TRUE))
+  expect_equal(class(iris$Species), class(df$Species))
+  expect_equal(iris$Species, df$Species)
+})
+
 test_that("SPARK-17811: can create DataFrame containing NA as date and time", {
   df <- data.frame(
 id = 1:2,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17902][R] Revive stringsAsFactors option for collect() in SparkR

2017-10-26 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 d2dc175a1 -> 24fe7ccba


[SPARK-17902][R] Revive stringsAsFactors option for collect() in SparkR

## What changes were proposed in this pull request?

This PR proposes to revive `stringsAsFactors` option in collect API, which was 
mistakenly removed in 
https://github.com/apache/spark/commit/71a138cd0e0a14e8426f97877e3b52a562bbd02c.

Simply, it casts `charactor` to `factor` if it meets the condition, 
`stringsAsFactors && is.character(vec)` in primitive type conversion.

## How was this patch tested?

Unit test in `R/pkg/tests/fulltests/test_sparkSQL.R`.

Author: hyukjinkwon 

Closes #19551 from HyukjinKwon/SPARK-17902.

(cherry picked from commit a83d8d5adcb4e0061e43105767242ba9770dda96)
Signed-off-by: hyukjinkwon 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24fe7ccb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24fe7ccb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24fe7ccb

Branch: refs/heads/branch-2.2
Commit: 24fe7ccbacd913c19fa40199fd5511aaf55c6bfa
Parents: d2dc175
Author: hyukjinkwon 
Authored: Thu Oct 26 20:54:36 2017 +0900
Committer: hyukjinkwon 
Committed: Thu Oct 26 20:55:00 2017 +0900

--
 R/pkg/R/DataFrame.R   | 3 +++
 R/pkg/tests/fulltests/test_sparkSQL.R | 6 ++
 2 files changed, 9 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/24fe7ccb/R/pkg/R/DataFrame.R
--
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 3859fa8..c0a954d 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -1174,6 +1174,9 @@ setMethod("collect",
 vec <- do.call(c, col)
 stopifnot(class(vec) != "list")
 class(vec) <- PRIMITIVE_TYPES[[colType]]
+if (is.character(vec) && stringsAsFactors) {
+  vec <- as.factor(vec)
+}
 df[[colIndex]] <- vec
   } else {
 df[[colIndex]] <- col

http://git-wip-us.apache.org/repos/asf/spark/blob/24fe7ccb/R/pkg/tests/fulltests/test_sparkSQL.R
--
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R 
b/R/pkg/tests/fulltests/test_sparkSQL.R
index 12d8fef..50c60fe 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -483,6 +483,12 @@ test_that("create DataFrame with different data types", {
   expect_equal(collect(df), data.frame(l, stringsAsFactors = FALSE))
 })
 
+test_that("SPARK-17902: collect() with stringsAsFactors enabled", {
+  df <- suppressWarnings(collect(createDataFrame(iris), stringsAsFactors = 
TRUE))
+  expect_equal(class(iris$Species), class(df$Species))
+  expect_equal(iris$Species, df$Species)
+})
+
 test_that("SPARK-17811: can create DataFrame containing NA as date and time", {
   df <- data.frame(
 id = 1:2,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21840][CORE] Add trait that allows conf to be directly set in application.

2017-10-26 Thread jshao
Repository: spark
Updated Branches:
  refs/heads/master 592cfeab9 -> 3073344a2


[SPARK-21840][CORE] Add trait that allows conf to be directly set in 
application.

Currently SparkSubmit uses system properties to propagate configuration to
applications. This makes it hard to implement features such as SPARK-11035,
which would allow multiple applications to be started in the same JVM. The
current code would cause the config data from multiple apps to get mixed
up.

This change introduces a new trait, currently internal to Spark, that allows
the app configuration to be passed directly to the application, without
having to use system properties. The current "call main() method" behavior
is maintained as an implementation of this new trait. This will be useful
to allow multiple cluster mode apps to be submitted from the same JVM.

As part of this, SparkSubmit was modified to collect all configuration
directly into a SparkConf instance. Most of the changes are to tests so
they use SparkConf instead of an opaque map.

Tested with existing and added unit tests.

Author: Marcelo Vanzin 

Closes #19519 from vanzin/SPARK-21840.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3073344a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3073344a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3073344a

Branch: refs/heads/master
Commit: 3073344a2551fb198d63f2114a519ab97904cb55
Parents: 592cfea
Author: Marcelo Vanzin 
Authored: Thu Oct 26 15:50:27 2017 +0800
Committer: jerryshao 
Committed: Thu Oct 26 15:50:27 2017 +0800

--
 .../apache/spark/deploy/SparkApplication.scala  |  55 +
 .../org/apache/spark/deploy/SparkSubmit.scala   | 160 +++---
 .../apache/spark/deploy/SparkSubmitSuite.scala  | 213 +++
 .../deploy/rest/StandaloneRestSubmitSuite.scala |   4 +-
 4 files changed, 257 insertions(+), 175 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3073344a/core/src/main/scala/org/apache/spark/deploy/SparkApplication.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkApplication.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkApplication.scala
new file mode 100644
index 000..118b460
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkApplication.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import java.lang.reflect.Modifier
+
+import org.apache.spark.SparkConf
+
+/**
+ * Entry point for a Spark application. Implementations must provide a 
no-argument constructor.
+ */
+private[spark] trait SparkApplication {
+
+  def start(args: Array[String], conf: SparkConf): Unit
+
+}
+
+/**
+ * Implementation of SparkApplication that wraps a standard Java class with a 
"main" method.
+ *
+ * Configuration is propagated to the application via system properties, so 
running multiple
+ * of these in the same JVM may lead to undefined behavior due to 
configuration leaks.
+ */
+private[deploy] class JavaMainApplication(klass: Class[_]) extends 
SparkApplication {
+
+  override def start(args: Array[String], conf: SparkConf): Unit = {
+val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
+if (!Modifier.isStatic(mainMethod.getModifiers)) {
+  throw new IllegalStateException("The main method in the given main class 
must be static")
+}
+
+val sysProps = conf.getAll.toMap
+sysProps.foreach { case (k, v) =>
+  sys.props(k) = v
+}
+
+mainMethod.invoke(null, args)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3073344a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index b7e6d0e..73b956e 100644
--- 

spark git commit: [SPARK-22308] Support alternative unit testing styles in external applications

2017-10-26 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 5433be44c -> 592cfeab9


[SPARK-22308] Support alternative unit testing styles in external applications

## What changes were proposed in this pull request?
Support unit tests of external code (i.e., applications that use spark) using 
scalatest that don't want to use FunSuite.  SharedSparkContext already supports 
this, but SharedSQLContext does not.

I've introduced SharedSparkSession as a parent to SharedSQLContext, written in 
a way that it does support all scalatest styles.

## How was this patch tested?
There are three new unit test suites added that just test using FunSpec, 
FlatSpec, and WordSpec.

Author: Nathan Kronenfeld 

Closes #19529 from nkronenfeld/alternative-style-tests-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/592cfeab
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/592cfeab
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/592cfeab

Branch: refs/heads/master
Commit: 592cfeab9caeff955d115a1ca5014ede7d402907
Parents: 5433be4
Author: Nathan Kronenfeld 
Authored: Thu Oct 26 00:29:49 2017 -0700
Committer: gatorsmile 
Committed: Thu Oct 26 00:29:49 2017 -0700

--
 .../org/apache/spark/SharedSparkContext.scala   |  17 +-
 .../spark/sql/catalyst/plans/PlanTest.scala |  10 +-
 .../spark/sql/test/GenericFlatSpecSuite.scala   |  45 +
 .../spark/sql/test/GenericFunSpecSuite.scala|  47 +
 .../spark/sql/test/GenericWordSpecSuite.scala   |  51 ++
 .../apache/spark/sql/test/SQLTestUtils.scala| 173 ++-
 .../spark/sql/test/SharedSQLContext.scala   |  84 +
 .../spark/sql/test/SharedSparkSession.scala | 119 +
 8 files changed, 381 insertions(+), 165 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/592cfeab/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
--
diff --git a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala 
b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
index 6aedcb1..1aa1c42 100644
--- a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
+++ b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala
@@ -29,10 +29,23 @@ trait SharedSparkContext extends BeforeAndAfterAll with 
BeforeAndAfterEach { sel
 
   var conf = new SparkConf(false)
 
+  /**
+   * Initialize the [[SparkContext]].  Generally, this is just called from 
beforeAll; however, in
+   * test using styles other than FunSuite, there is often code that relies on 
the session between
+   * test group constructs and the actual tests, which may need this session.  
It is purely a
+   * semantic difference, but semantically, it makes more sense to call 
'initializeContext' between
+   * a 'describe' and an 'it' call than it does to call 'beforeAll'.
+   */
+  protected def initializeContext(): Unit = {
+if (null == _sc) {
+  _sc = new SparkContext(
+"local[4]", "test", conf.set("spark.hadoop.fs.file.impl", 
classOf[DebugFilesystem].getName))
+}
+  }
+
   override def beforeAll() {
 super.beforeAll()
-_sc = new SparkContext(
-  "local[4]", "test", conf.set("spark.hadoop.fs.file.impl", 
classOf[DebugFilesystem].getName))
+initializeContext()
   }
 
   override def afterAll() {

http://git-wip-us.apache.org/repos/asf/spark/blob/592cfeab/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
index 10bdfaf..82c5307 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.plans
 
+import org.scalatest.Suite
+
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer
@@ -29,7 +31,13 @@ import org.apache.spark.sql.internal.SQLConf
 /**
  * Provides helper methods for comparing plans.
  */
-trait PlanTest extends SparkFunSuite with PredicateHelper {
+trait PlanTest extends SparkFunSuite with PlanTestBase
+
+/**
+ * Provides helper methods for comparing plans, but without the overhead of
+ * mandating a FunSuite.
+ */
+trait PlanTestBase extends PredicateHelper { self: Suite =>
 
   // TODO(gatorsmile): remove this from PlanTest and all the analyzer rules
   protected def conf = SQLConf.get