[spark] Git Push Summary
Repository: spark Updated Branches: refs/heads/test2.2 [deleted] cb54f297a - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22356][SQL] data source table should support overlapped columns between data and partition schema
Repository: spark Updated Branches: refs/heads/branch-2.2 2839280ad -> cb54f297a [SPARK-22356][SQL] data source table should support overlapped columns between data and partition schema This is a regression introduced by #14207. After Spark 2.1, we store the inferred schema when creating the table, to avoid inferring schema again at read path. However, there is one special case: overlapped columns between data and partition. For this case, it breaks the assumption of table schema that there is on ovelap between data and partition schema, and partition columns should be at the end. The result is, for Spark 2.1, the table scan has incorrect schema that puts partition columns at the end. For Spark 2.2, we add a check in CatalogTable to validate table schema, which fails at this case. To fix this issue, a simple and safe approach is to fallback to old behavior when overlapeed columns detected, i.e. store empty schema in metastore. new regression test Author: Wenchen FanCloses #19579 from cloud-fan/bug2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb54f297 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb54f297 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb54f297 Branch: refs/heads/branch-2.2 Commit: cb54f297ae52690e6162b2bab9a3940d38ff82f2 Parents: 2839280 Author: Wenchen Fan Authored: Thu Oct 26 17:39:53 2017 -0700 Committer: gatorsmile Committed: Thu Oct 26 17:56:29 2017 -0700 -- .../command/createDataSourceTables.scala| 35 +- .../datasources/HadoopFsRelation.scala | 25 - .../org/apache/spark/sql/SQLQuerySuite.scala| 16 + .../hive/HiveExternalCatalogVersionsSuite.scala | 38 +++- 4 files changed, 89 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cb54f297/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 2d89011..d05af89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.types.StructType /** * A command used to create a data source table. @@ -87,14 +88,32 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo } } -val newTable = table.copy( - schema = dataSource.schema, - partitionColumnNames = partitionColumnNames, - // If metastore partition management for file source tables is enabled, we start off with - // partition provider hive, but no partitions in the metastore. The user has to call - // `msck repair table` to populate the table partitions. - tracksPartitionsInCatalog = partitionColumnNames.nonEmpty && -sessionState.conf.manageFilesourcePartitions) +val newTable = dataSource match { + // Since Spark 2.1, we store the inferred schema of data source in metastore, to avoid + // inferring the schema again at read path. However if the data source has overlapped columns + // between data and partition schema, we can't store it in metastore as it breaks the + // assumption of table schema. Here we fallback to the behavior of Spark prior to 2.1, store + // empty schema in metastore and infer it at runtime. Note that this also means the new + // scalable partitioning handling feature(introduced at Spark 2.1) is disabled in this case. + case r: HadoopFsRelation if r.overlappedPartCols.nonEmpty => +logWarning("It is not recommended to create a table with overlapped data and partition " + + "columns, as Spark cannot store a valid table schema and has to infer it at runtime, " + + "which hurts performance. Please check your data files and remove the partition " + + "columns in it.") +table.copy(schema = new StructType(), partitionColumnNames = Nil) + + case _ => +table.copy( + schema = dataSource.schema, + partitionColumnNames = partitionColumnNames, + // If metastore partition management for file source tables is enabled, we start
spark git commit: [SPARK-22356][SQL] data source table should support overlapped columns between data and partition schema
Repository: spark Updated Branches: refs/heads/test2.2 [created] cb54f297a [SPARK-22356][SQL] data source table should support overlapped columns between data and partition schema This is a regression introduced by #14207. After Spark 2.1, we store the inferred schema when creating the table, to avoid inferring schema again at read path. However, there is one special case: overlapped columns between data and partition. For this case, it breaks the assumption of table schema that there is on ovelap between data and partition schema, and partition columns should be at the end. The result is, for Spark 2.1, the table scan has incorrect schema that puts partition columns at the end. For Spark 2.2, we add a check in CatalogTable to validate table schema, which fails at this case. To fix this issue, a simple and safe approach is to fallback to old behavior when overlapeed columns detected, i.e. store empty schema in metastore. new regression test Author: Wenchen FanCloses #19579 from cloud-fan/bug2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb54f297 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb54f297 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb54f297 Branch: refs/heads/test2.2 Commit: cb54f297ae52690e6162b2bab9a3940d38ff82f2 Parents: 2839280 Author: Wenchen Fan Authored: Thu Oct 26 17:39:53 2017 -0700 Committer: gatorsmile Committed: Thu Oct 26 17:56:29 2017 -0700 -- .../command/createDataSourceTables.scala| 35 +- .../datasources/HadoopFsRelation.scala | 25 - .../org/apache/spark/sql/SQLQuerySuite.scala| 16 + .../hive/HiveExternalCatalogVersionsSuite.scala | 38 +++- 4 files changed, 89 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cb54f297/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 2d89011..d05af89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.types.StructType /** * A command used to create a data source table. @@ -87,14 +88,32 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo } } -val newTable = table.copy( - schema = dataSource.schema, - partitionColumnNames = partitionColumnNames, - // If metastore partition management for file source tables is enabled, we start off with - // partition provider hive, but no partitions in the metastore. The user has to call - // `msck repair table` to populate the table partitions. - tracksPartitionsInCatalog = partitionColumnNames.nonEmpty && -sessionState.conf.manageFilesourcePartitions) +val newTable = dataSource match { + // Since Spark 2.1, we store the inferred schema of data source in metastore, to avoid + // inferring the schema again at read path. However if the data source has overlapped columns + // between data and partition schema, we can't store it in metastore as it breaks the + // assumption of table schema. Here we fallback to the behavior of Spark prior to 2.1, store + // empty schema in metastore and infer it at runtime. Note that this also means the new + // scalable partitioning handling feature(introduced at Spark 2.1) is disabled in this case. + case r: HadoopFsRelation if r.overlappedPartCols.nonEmpty => +logWarning("It is not recommended to create a table with overlapped data and partition " + + "columns, as Spark cannot store a valid table schema and has to infer it at runtime, " + + "which hurts performance. Please check your data files and remove the partition " + + "columns in it.") +table.copy(schema = new StructType(), partitionColumnNames = Nil) + + case _ => +table.copy( + schema = dataSource.schema, + partitionColumnNames = partitionColumnNames, + // If metastore partition management for file source tables is enabled, we start off with +
spark git commit: [SPARK-22355][SQL] Dataset.collect is not threadsafe
Repository: spark Updated Branches: refs/heads/branch-2.2 a607ddc52 -> 2839280ad [SPARK-22355][SQL] Dataset.collect is not threadsafe It's possible that users create a `Dataset`, and call `collect` of this `Dataset` in many threads at the same time. Currently `Dataset#collect` just call `encoder.fromRow` to convert spark rows to objects of type T, and this encoder is per-dataset. This means `Dataset#collect` is not thread-safe, because the encoder uses a projection to output the object to a re-usable row. This PR fixes this problem, by creating a new projection when calling `Dataset#collect`, so that we have the re-usable row for each method call, instead of each Dataset. N/A Author: Wenchen FanCloses #19577 from cloud-fan/encoder. (cherry picked from commit 5c3a1f3fad695317c2fff1243cdb9b3ceb25c317) Signed-off-by: gatorsmile Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2839280a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2839280a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2839280a Branch: refs/heads/branch-2.2 Commit: 2839280adc930593c64a74892fec79dcc666d468 Parents: a607ddc Author: Wenchen Fan Authored: Thu Oct 26 17:51:16 2017 -0700 Committer: gatorsmile Committed: Thu Oct 26 17:52:26 2017 -0700 -- .../scala/org/apache/spark/sql/Dataset.scala| 33 +--- 1 file changed, 22 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2839280a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index a775fb8..1acbad9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.parser.ParseException @@ -195,15 +196,10 @@ class Dataset[T] private[sql]( */ private[sql] implicit val exprEnc: ExpressionEncoder[T] = encoderFor(encoder) - /** - * Encoder is used mostly as a container of serde expressions in Dataset. We build logical - * plans by these serde expressions and execute it within the query framework. However, for - * performance reasons we may want to use encoder as a function to deserialize internal rows to - * custom objects, e.g. collect. Here we resolve and bind the encoder so that we can call its - * `fromRow` method later. - */ - private val boundEnc = -exprEnc.resolveAndBind(logicalPlan.output, sparkSession.sessionState.analyzer) + // The deserializer expression which can be used to build a projection and turn rows to objects + // of type T, after collecting rows to the driver side. + private val deserializer = +exprEnc.resolveAndBind(logicalPlan.output, sparkSession.sessionState.analyzer).deserializer private implicit def classTag = exprEnc.clsTag @@ -2418,7 +2414,15 @@ class Dataset[T] private[sql]( */ def toLocalIterator(): java.util.Iterator[T] = { withAction("toLocalIterator", queryExecution) { plan => - plan.executeToIterator().map(boundEnc.fromRow).asJava + // This projection writes output to a `InternalRow`, which means applying this projection is + // not thread-safe. Here we create the projection inside this method to make `Dataset` + // thread-safe. + val objProj = GenerateSafeProjection.generate(deserializer :: Nil) + plan.executeToIterator().map { row => +// The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type +// parameter of its `get` method, so it's safe to use null here. +objProj(row).get(0, null).asInstanceOf[T] + }.asJava } } @@ -2851,7 +2855,14 @@ class Dataset[T] private[sql]( * Collect all elements from a spark plan. */ private def collectFromPlan(plan: SparkPlan): Array[T] = { -plan.executeCollect().map(boundEnc.fromRow) +// This projection writes output to a `InternalRow`, which means applying this projection is not +// thread-safe. Here we create the projection inside this method to make `Dataset` thread-safe. +
spark git commit: [SPARK-22355][SQL] Dataset.collect is not threadsafe
Repository: spark Updated Branches: refs/heads/master 9b262f6a0 -> 5c3a1f3fa [SPARK-22355][SQL] Dataset.collect is not threadsafe ## What changes were proposed in this pull request? It's possible that users create a `Dataset`, and call `collect` of this `Dataset` in many threads at the same time. Currently `Dataset#collect` just call `encoder.fromRow` to convert spark rows to objects of type T, and this encoder is per-dataset. This means `Dataset#collect` is not thread-safe, because the encoder uses a projection to output the object to a re-usable row. This PR fixes this problem, by creating a new projection when calling `Dataset#collect`, so that we have the re-usable row for each method call, instead of each Dataset. ## How was this patch tested? N/A Author: Wenchen FanCloses #19577 from cloud-fan/encoder. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c3a1f3f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c3a1f3f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c3a1f3f Branch: refs/heads/master Commit: 5c3a1f3fad695317c2fff1243cdb9b3ceb25c317 Parents: 9b262f6 Author: Wenchen Fan Authored: Thu Oct 26 17:51:16 2017 -0700 Committer: gatorsmile Committed: Thu Oct 26 17:51:16 2017 -0700 -- .../scala/org/apache/spark/sql/Dataset.scala| 33 +--- 1 file changed, 22 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5c3a1f3f/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index b70dfc0..0e23983 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions} import org.apache.spark.sql.catalyst.optimizer.CombineUnions import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils} @@ -198,15 +199,10 @@ class Dataset[T] private[sql]( */ private[sql] implicit val exprEnc: ExpressionEncoder[T] = encoderFor(encoder) - /** - * Encoder is used mostly as a container of serde expressions in Dataset. We build logical - * plans by these serde expressions and execute it within the query framework. However, for - * performance reasons we may want to use encoder as a function to deserialize internal rows to - * custom objects, e.g. collect. Here we resolve and bind the encoder so that we can call its - * `fromRow` method later. - */ - private val boundEnc = -exprEnc.resolveAndBind(logicalPlan.output, sparkSession.sessionState.analyzer) + // The deserializer expression which can be used to build a projection and turn rows to objects + // of type T, after collecting rows to the driver side. + private val deserializer = +exprEnc.resolveAndBind(logicalPlan.output, sparkSession.sessionState.analyzer).deserializer private implicit def classTag = exprEnc.clsTag @@ -2661,7 +2657,15 @@ class Dataset[T] private[sql]( */ def toLocalIterator(): java.util.Iterator[T] = { withAction("toLocalIterator", queryExecution) { plan => - plan.executeToIterator().map(boundEnc.fromRow).asJava + // This projection writes output to a `InternalRow`, which means applying this projection is + // not thread-safe. Here we create the projection inside this method to make `Dataset` + // thread-safe. + val objProj = GenerateSafeProjection.generate(deserializer :: Nil) + plan.executeToIterator().map { row => +// The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type +// parameter of its `get` method, so it's safe to use null here. +objProj(row).get(0, null).asInstanceOf[T] + }.asJava } } @@ -3102,7 +3106,14 @@ class Dataset[T] private[sql]( * Collect all elements from a spark plan. */ private def collectFromPlan(plan: SparkPlan): Array[T] = { -plan.executeCollect().map(boundEnc.fromRow) +// This projection writes output to a `InternalRow`, which means applying this projection is not +// thread-safe. Here we create the projection inside this method to make `Dataset` thread-safe. +val objProj =
spark git commit: [SPARK-22356][SQL] data source table should support overlapped columns between data and partition schema
Repository: spark Updated Branches: refs/heads/master 8e9863531 -> 9b262f6a0 [SPARK-22356][SQL] data source table should support overlapped columns between data and partition schema ## What changes were proposed in this pull request? This is a regression introduced by #14207. After Spark 2.1, we store the inferred schema when creating the table, to avoid inferring schema again at read path. However, there is one special case: overlapped columns between data and partition. For this case, it breaks the assumption of table schema that there is on ovelap between data and partition schema, and partition columns should be at the end. The result is, for Spark 2.1, the table scan has incorrect schema that puts partition columns at the end. For Spark 2.2, we add a check in CatalogTable to validate table schema, which fails at this case. To fix this issue, a simple and safe approach is to fallback to old behavior when overlapeed columns detected, i.e. store empty schema in metastore. ## How was this patch tested? new regression test Author: Wenchen FanCloses #19579 from cloud-fan/bug2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b262f6a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b262f6a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b262f6a Branch: refs/heads/master Commit: 9b262f6a08c0c1b474d920d49b9fdd574c401d39 Parents: 8e98635 Author: Wenchen Fan Authored: Thu Oct 26 17:39:53 2017 -0700 Committer: gatorsmile Committed: Thu Oct 26 17:39:53 2017 -0700 -- .../command/createDataSourceTables.scala| 35 +- .../datasources/HadoopFsRelation.scala | 25 - .../org/apache/spark/sql/SQLQuerySuite.scala| 16 + .../hive/HiveExternalCatalogVersionsSuite.scala | 38 +++- 4 files changed, 89 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9b262f6a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 9e39079..306f43d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.types.StructType /** * A command used to create a data source table. @@ -85,14 +86,32 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo } } -val newTable = table.copy( - schema = dataSource.schema, - partitionColumnNames = partitionColumnNames, - // If metastore partition management for file source tables is enabled, we start off with - // partition provider hive, but no partitions in the metastore. The user has to call - // `msck repair table` to populate the table partitions. - tracksPartitionsInCatalog = partitionColumnNames.nonEmpty && -sessionState.conf.manageFilesourcePartitions) +val newTable = dataSource match { + // Since Spark 2.1, we store the inferred schema of data source in metastore, to avoid + // inferring the schema again at read path. However if the data source has overlapped columns + // between data and partition schema, we can't store it in metastore as it breaks the + // assumption of table schema. Here we fallback to the behavior of Spark prior to 2.1, store + // empty schema in metastore and infer it at runtime. Note that this also means the new + // scalable partitioning handling feature(introduced at Spark 2.1) is disabled in this case. + case r: HadoopFsRelation if r.overlappedPartCols.nonEmpty => +logWarning("It is not recommended to create a table with overlapped data and partition " + + "columns, as Spark cannot store a valid table schema and has to infer it at runtime, " + + "which hurts performance. Please check your data files and remove the partition " + + "columns in it.") +table.copy(schema = new StructType(), partitionColumnNames = Nil) + + case _ => +table.copy( + schema = dataSource.schema, + partitionColumnNames = partitionColumnNames, + // If
spark git commit: [SPARK-22366] Support ignoring missing files
Repository: spark Updated Branches: refs/heads/master 5415963d2 -> 8e9863531 [SPARK-22366] Support ignoring missing files ## What changes were proposed in this pull request? Add a flag "spark.sql.files.ignoreMissingFiles" to parallel the existing flag "spark.sql.files.ignoreCorruptFiles". ## How was this patch tested? new unit test Author: Jose TorresCloses #19581 from joseph-torres/SPARK-22366. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8e986353 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8e986353 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8e986353 Branch: refs/heads/master Commit: 8e9863531bebbd4d83eafcbc2b359b8bd0ac5734 Parents: 5415963 Author: Jose Torres Authored: Thu Oct 26 16:55:30 2017 -0700 Committer: Shixiong Zhu Committed: Thu Oct 26 16:55:30 2017 -0700 -- .../org/apache/spark/sql/internal/SQLConf.scala | 8 + .../sql/execution/datasources/FileScanRDD.scala | 13 +--- .../datasources/parquet/ParquetQuerySuite.scala | 33 3 files changed, 50 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8e986353/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 4cfe53b..21e4685 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -614,6 +614,12 @@ object SQLConf { .booleanConf .createWithDefault(false) + val IGNORE_MISSING_FILES = buildConf("spark.sql.files.ignoreMissingFiles") +.doc("Whether to ignore missing files. If true, the Spark jobs will continue to run when " + + "encountering missing files and the contents that have been read will still be returned.") +.booleanConf +.createWithDefault(false) + val MAX_RECORDS_PER_FILE = buildConf("spark.sql.files.maxRecordsPerFile") .doc("Maximum number of records to write out to a single file. " + "If this value is zero or negative, there is no limit.") @@ -1014,6 +1020,8 @@ class SQLConf extends Serializable with Logging { def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES) + def ignoreMissingFiles: Boolean = getConf(IGNORE_MISSING_FILES) + def maxRecordsPerFile: Long = getConf(MAX_RECORDS_PER_FILE) def useCompression: Boolean = getConf(COMPRESS_CACHED) http://git-wip-us.apache.org/repos/asf/spark/blob/8e986353/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 9df2073..8731ee8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -66,6 +66,7 @@ class FileScanRDD( extends RDD[InternalRow](sparkSession.sparkContext, Nil) { private val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles + private val ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles override def compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = { val iterator = new Iterator[Object] with AutoCloseable { @@ -142,7 +143,7 @@ class FileScanRDD( // Sets InputFileBlockHolder for the file block's information InputFileBlockHolder.set(currentFile.filePath, currentFile.start, currentFile.length) - if (ignoreCorruptFiles) { + if (ignoreMissingFiles || ignoreCorruptFiles) { currentIterator = new NextIterator[Object] { // The readFunction may read some bytes before consuming the iterator, e.g., // vectorized Parquet reader. Here we use lazy val to delay the creation of @@ -158,9 +159,13 @@ class FileScanRDD( null } } catch { - // Throw FileNotFoundException even `ignoreCorruptFiles` is true - case e: FileNotFoundException => throw e - case e @ (_: RuntimeException | _: IOException) => + case e: FileNotFoundException if ignoreMissingFiles => +logWarning(s"Skipped missing file: $currentFile", e) +finished = true +
spark git commit: [SPARK-22131][MESOS] Mesos driver secrets
Repository: spark Updated Branches: refs/heads/master 4f8dc6b01 -> 5415963d2 [SPARK-22131][MESOS] Mesos driver secrets ## Background In #18837 , ArtRand added Mesos secrets support to the dispatcher. **This PR is to add the same secrets support to the drivers.** This means if the secret configs are set, the driver will launch executors that have access to either env or file-based secrets. One use case for this is to support TLS in the driver <=> executor communication. ## What changes were proposed in this pull request? Most of the changes are a refactor of the dispatcher secrets support (#18837) - moving it to a common place that can be used by both the dispatcher and drivers. The same goes for the unit tests. ## How was this patch tested? There are four config combinations: [env or file-based] x [value or reference secret]. For each combination: - Added a unit test. - Tested in DC/OS. Author: Susan X. HuynhCloses #19437 from susanxhuynh/sh-mesos-driver-secret. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5415963d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5415963d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5415963d Branch: refs/heads/master Commit: 5415963d2caaf95604211419ffc4e29fff38e1d7 Parents: 4f8dc6b Author: Susan X. Huynh Authored: Thu Oct 26 16:13:48 2017 -0700 Committer: Marcelo Vanzin Committed: Thu Oct 26 16:13:48 2017 -0700 -- docs/running-on-mesos.md| 111 +++--- .../org/apache/spark/deploy/mesos/config.scala | 64 .../cluster/mesos/MesosClusterScheduler.scala | 138 - .../MesosCoarseGrainedSchedulerBackend.scala| 31 +++- .../MesosFineGrainedSchedulerBackend.scala | 4 +- .../mesos/MesosSchedulerBackendUtil.scala | 92 +++- .../mesos/MesosClusterSchedulerSuite.scala | 150 +++ ...esosCoarseGrainedSchedulerBackendSuite.scala | 34 - .../mesos/MesosSchedulerBackendUtilSuite.scala | 7 +- .../spark/scheduler/cluster/mesos/Utils.scala | 107 + 10 files changed, 434 insertions(+), 304 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5415963d/docs/running-on-mesos.md -- diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index e0944bc..b7e3e64 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -485,39 +485,106 @@ See the [configuration page](configuration.html) for information on Spark config - spark.mesos.driver.secret.envkeys - (none) -A comma-separated list that, if set, the contents of the secret referenced -by spark.mesos.driver.secret.names or spark.mesos.driver.secret.values will be -set to the provided environment variable in the driver's process. +spark.mesos.driver.secret.values, +spark.mesos.driver.secret.names, +spark.mesos.executor.secret.values, +spark.mesos.executor.secret.names, - - -spark.mesos.driver.secret.filenames (none) -A comma-separated list that, if set, the contents of the secret referenced by -spark.mesos.driver.secret.names or spark.mesos.driver.secret.values will be -written to the provided file. Paths are relative to the container's work -directory. Absolute paths must already exist. Consult the Mesos Secret -protobuf for more information. + + A secret is specified by its contents and destination. These properties + specify a secret's contents. To specify a secret's destination, see the cell below. + + + You can specify a secret's contents either (1) by value or (2) by reference. + + + (1) To specify a secret by value, set the + spark.mesos.[driver|executor].secret.values + property, to make the secret available in the driver or executors. + For example, to make a secret password "guessme" available to the driver process, set: + + spark.mesos.driver.secret.values=guessme + + + (2) To specify a secret that has been placed in a secret store + by reference, specify its name within the secret store + by setting the spark.mesos.[driver|executor].secret.names + property. For example, to make a secret password named "password" in a secret store + available to the driver process, set: + + spark.mesos.driver.secret.names=password + + + Note: To use a secret store, make sure one has been integrated with Mesos via a custom + http://mesos.apache.org/documentation/latest/secrets/;>SecretResolver + module. + + + To specify multiple secrets, provide a comma-separated list: + +
spark git commit: [SPARK-22328][CORE] ClosureCleaner should not miss referenced superclass fields
Repository: spark Updated Branches: refs/heads/branch-2.2 24fe7ccba -> a607ddc52 [SPARK-22328][CORE] ClosureCleaner should not miss referenced superclass fields When the given closure uses some fields defined in super class, `ClosureCleaner` can't figure them and don't set it properly. Those fields will be in null values. Added test. Author: Liang-Chi HsiehCloses #19556 from viirya/SPARK-22328. (cherry picked from commit 4f8dc6b01ea787243a38678ea8199fbb0814cffc) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a607ddc5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a607ddc5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a607ddc5 Branch: refs/heads/branch-2.2 Commit: a607ddc52e933151327f9b097a453eff38fcf748 Parents: 24fe7cc Author: Liang-Chi Hsieh Authored: Thu Oct 26 21:41:45 2017 +0100 Committer: Wenchen Fan Committed: Thu Oct 26 21:44:17 2017 +0100 -- .../org/apache/spark/util/ClosureCleaner.scala | 73 .../apache/spark/util/ClosureCleanerSuite.scala | 72 +++ 2 files changed, 133 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a607ddc5/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 489688c..2d5d3f8 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -91,6 +91,54 @@ private[spark] object ClosureCleaner extends Logging { (seen - obj.getClass).toList } + /** Initializes the accessed fields for outer classes and their super classes. */ + private def initAccessedFields( + accessedFields: Map[Class[_], Set[String]], + outerClasses: Seq[Class[_]]): Unit = { +for (cls <- outerClasses) { + var currentClass = cls + assert(currentClass != null, "The outer class can't be null.") + + while (currentClass != null) { +accessedFields(currentClass) = Set.empty[String] +currentClass = currentClass.getSuperclass() + } +} + } + + /** Sets accessed fields for given class in clone object based on given object. */ + private def setAccessedFields( + outerClass: Class[_], + clone: AnyRef, + obj: AnyRef, + accessedFields: Map[Class[_], Set[String]]): Unit = { +for (fieldName <- accessedFields(outerClass)) { + val field = outerClass.getDeclaredField(fieldName) + field.setAccessible(true) + val value = field.get(obj) + field.set(clone, value) +} + } + + /** Clones a given object and sets accessed fields in cloned object. */ + private def cloneAndSetFields( + parent: AnyRef, + obj: AnyRef, + outerClass: Class[_], + accessedFields: Map[Class[_], Set[String]]): AnyRef = { +val clone = instantiateClass(outerClass, parent) + +var currentClass = outerClass +assert(currentClass != null, "The outer class can't be null.") + +while (currentClass != null) { + setAccessedFields(currentClass, clone, obj, accessedFields) + currentClass = currentClass.getSuperclass() +} + +clone + } + /** * Clean the given closure in place. * @@ -200,9 +248,8 @@ private[spark] object ClosureCleaner extends Logging { logDebug(s" + populating accessed fields because this is the starting closure") // Initialize accessed fields with the outer classes first // This step is needed to associate the fields to the correct classes later - for (cls <- outerClasses) { -accessedFields(cls) = Set[String]() - } + initAccessedFields(accessedFields, outerClasses) + // Populate accessed fields by visiting all fields and methods accessed by this and // all of its inner closures. If transitive cleaning is enabled, this may recursively // visits methods that belong to other classes in search of transitively referenced fields. @@ -248,13 +295,8 @@ private[spark] object ClosureCleaner extends Logging { // required fields from the original object. We need the parent here because the Java // language specification requires the first constructor parameter of any closure to be // its enclosing object. - val clone = instantiateClass(cls, parent) - for (fieldName <- accessedFields(cls)) { -val field = cls.getDeclaredField(fieldName) -field.setAccessible(true) -val value = field.get(obj) -field.set(clone, value) - } +
spark git commit: [SPARK-22328][CORE] ClosureCleaner should not miss referenced superclass fields
Repository: spark Updated Branches: refs/heads/master 0e9a750a8 -> 4f8dc6b01 [SPARK-22328][CORE] ClosureCleaner should not miss referenced superclass fields ## What changes were proposed in this pull request? When the given closure uses some fields defined in super class, `ClosureCleaner` can't figure them and don't set it properly. Those fields will be in null values. ## How was this patch tested? Added test. Author: Liang-Chi HsiehCloses #19556 from viirya/SPARK-22328. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f8dc6b0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f8dc6b0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f8dc6b0 Branch: refs/heads/master Commit: 4f8dc6b01ea787243a38678ea8199fbb0814cffc Parents: 0e9a750 Author: Liang-Chi Hsieh Authored: Thu Oct 26 21:41:45 2017 +0100 Committer: Wenchen Fan Committed: Thu Oct 26 21:41:45 2017 +0100 -- .../org/apache/spark/util/ClosureCleaner.scala | 73 .../apache/spark/util/ClosureCleanerSuite.scala | 72 +++ 2 files changed, 133 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4f8dc6b0/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 48a1d7b..dfece5d 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -91,6 +91,54 @@ private[spark] object ClosureCleaner extends Logging { (seen - obj.getClass).toList } + /** Initializes the accessed fields for outer classes and their super classes. */ + private def initAccessedFields( + accessedFields: Map[Class[_], Set[String]], + outerClasses: Seq[Class[_]]): Unit = { +for (cls <- outerClasses) { + var currentClass = cls + assert(currentClass != null, "The outer class can't be null.") + + while (currentClass != null) { +accessedFields(currentClass) = Set.empty[String] +currentClass = currentClass.getSuperclass() + } +} + } + + /** Sets accessed fields for given class in clone object based on given object. */ + private def setAccessedFields( + outerClass: Class[_], + clone: AnyRef, + obj: AnyRef, + accessedFields: Map[Class[_], Set[String]]): Unit = { +for (fieldName <- accessedFields(outerClass)) { + val field = outerClass.getDeclaredField(fieldName) + field.setAccessible(true) + val value = field.get(obj) + field.set(clone, value) +} + } + + /** Clones a given object and sets accessed fields in cloned object. */ + private def cloneAndSetFields( + parent: AnyRef, + obj: AnyRef, + outerClass: Class[_], + accessedFields: Map[Class[_], Set[String]]): AnyRef = { +val clone = instantiateClass(outerClass, parent) + +var currentClass = outerClass +assert(currentClass != null, "The outer class can't be null.") + +while (currentClass != null) { + setAccessedFields(currentClass, clone, obj, accessedFields) + currentClass = currentClass.getSuperclass() +} + +clone + } + /** * Clean the given closure in place. * @@ -202,9 +250,8 @@ private[spark] object ClosureCleaner extends Logging { logDebug(s" + populating accessed fields because this is the starting closure") // Initialize accessed fields with the outer classes first // This step is needed to associate the fields to the correct classes later - for (cls <- outerClasses) { -accessedFields(cls) = Set.empty[String] - } + initAccessedFields(accessedFields, outerClasses) + // Populate accessed fields by visiting all fields and methods accessed by this and // all of its inner closures. If transitive cleaning is enabled, this may recursively // visits methods that belong to other classes in search of transitively referenced fields. @@ -250,13 +297,8 @@ private[spark] object ClosureCleaner extends Logging { // required fields from the original object. We need the parent here because the Java // language specification requires the first constructor parameter of any closure to be // its enclosing object. - val clone = instantiateClass(cls, parent) - for (fieldName <- accessedFields(cls)) { -val field = cls.getDeclaredField(fieldName) -field.setAccessible(true) -val value = field.get(obj) -field.set(clone, value) - } + val clone = cloneAndSetFields(parent,
spark git commit: [SPARK-20643][CORE] Add listener implementation to collect app state.
Repository: spark Updated Branches: refs/heads/master a83d8d5ad -> 0e9a750a8 [SPARK-20643][CORE] Add listener implementation to collect app state. The initial listener code is based on the existing JobProgressListener (and others), and tries to mimic their behavior as much as possible. The change also includes some minor code movement so that some types and methods from the initial history server code code can be reused. The code introduces a few mutable versions of public API types, used internally, to make it easier to update information without ugly copy methods, and also to make certain updates cheaper. Note the code here is not 100% correct. This is meant as a building ground for the UI integration in the next milestones. As different parts of the UI are ported, fixes will be made to the different parts of this code to account for the needed behavior. I also added annotations to API types so that Jackson is able to correctly deserialize options, sequences and maps that store primitive types. Author: Marcelo VanzinCloses #19383 from vanzin/SPARK-20643. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e9a750a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e9a750a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e9a750a Branch: refs/heads/master Commit: 0e9a750a8d389b3a17834584d31c204c77c6970d Parents: a83d8d5 Author: Marcelo Vanzin Authored: Thu Oct 26 11:05:16 2017 -0500 Committer: Imran Rashid Committed: Thu Oct 26 11:05:16 2017 -0500 -- .../apache/spark/util/kvstore/KVTypeInfo.java | 2 + .../org/apache/spark/util/kvstore/LevelDB.java | 2 +- .../apache/spark/status/api/v1/StageStatus.java | 3 +- .../deploy/history/FsHistoryProvider.scala | 37 +- .../apache/spark/deploy/history/config.scala| 6 - .../apache/spark/status/AppStatusListener.scala | 531 ++ .../scala/org/apache/spark/status/KVUtils.scala | 73 ++ .../org/apache/spark/status/LiveEntity.scala| 526 ++ .../spark/status/api/v1/AllStagesResource.scala | 4 +- .../org/apache/spark/status/api/v1/api.scala| 11 +- .../org/apache/spark/status/storeTypes.scala| 98 +++ .../deploy/history/FsHistoryProviderSuite.scala | 2 +- .../spark/status/AppStatusListenerSuite.scala | 690 +++ project/MimaExcludes.scala | 2 + 14 files changed, 1942 insertions(+), 45 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0e9a750a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java -- diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java index a2b077e..870b484 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java @@ -46,6 +46,7 @@ public class KVTypeInfo { KVIndex idx = f.getAnnotation(KVIndex.class); if (idx != null) { checkIndex(idx, indices); +f.setAccessible(true); indices.put(idx.value(), idx); f.setAccessible(true); accessors.put(idx.value(), new FieldAccessor(f)); @@ -58,6 +59,7 @@ public class KVTypeInfo { checkIndex(idx, indices); Preconditions.checkArgument(m.getParameterTypes().length == 0, "Annotated method %s::%s should not have any parameters.", type.getName(), m.getName()); +m.setAccessible(true); indices.put(idx.value(), idx); m.setAccessible(true); accessors.put(idx.value(), new MethodAccessor(m)); http://git-wip-us.apache.org/repos/asf/spark/blob/0e9a750a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java -- diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java index ff48b15..4f9e10c 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java @@ -76,7 +76,7 @@ public class LevelDB implements KVStore { this.types = new ConcurrentHashMap<>(); Options options = new Options(); -options.createIfMissing(!path.exists()); +options.createIfMissing(true); this._db = new AtomicReference<>(JniDBFactory.factory.open(path, options)); byte[] versionData = db().get(STORE_VERSION_KEY);
spark git commit: [SPARK-17902][R] Revive stringsAsFactors option for collect() in SparkR
Repository: spark Updated Branches: refs/heads/branch-2.1 3e77b7481 -> aa023fddb [SPARK-17902][R] Revive stringsAsFactors option for collect() in SparkR ## What changes were proposed in this pull request? This PR proposes to revive `stringsAsFactors` option in collect API, which was mistakenly removed in https://github.com/apache/spark/commit/71a138cd0e0a14e8426f97877e3b52a562bbd02c. Simply, it casts `charactor` to `factor` if it meets the condition, `stringsAsFactors && is.character(vec)` in primitive type conversion. ## How was this patch tested? Unit test in `R/pkg/tests/fulltests/test_sparkSQL.R`. Author: hyukjinkwonCloses #19551 from HyukjinKwon/SPARK-17902. (cherry picked from commit a83d8d5adcb4e0061e43105767242ba9770dda96) Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa023fdd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa023fdd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa023fdd Branch: refs/heads/branch-2.1 Commit: aa023fddb0abb6cf8ded94ac695ba7b0edb02022 Parents: 3e77b74 Author: hyukjinkwon Authored: Thu Oct 26 20:54:36 2017 +0900 Committer: hyukjinkwon Committed: Thu Oct 26 20:55:14 2017 +0900 -- R/pkg/R/DataFrame.R | 3 +++ R/pkg/tests/fulltests/test_sparkSQL.R | 6 ++ 2 files changed, 9 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/aa023fdd/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index d0f0979..5899fa8 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1173,6 +1173,9 @@ setMethod("collect", vec <- do.call(c, col) stopifnot(class(vec) != "list") class(vec) <- PRIMITIVE_TYPES[[colType]] +if (is.character(vec) && stringsAsFactors) { + vec <- as.factor(vec) +} df[[colIndex]] <- vec } else { df[[colIndex]] <- col http://git-wip-us.apache.org/repos/asf/spark/blob/aa023fdd/R/pkg/tests/fulltests/test_sparkSQL.R -- diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index fedca67..0b88e47 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -417,6 +417,12 @@ test_that("create DataFrame with different data types", { expect_equal(collect(df), data.frame(l, stringsAsFactors = FALSE)) }) +test_that("SPARK-17902: collect() with stringsAsFactors enabled", { + df <- suppressWarnings(collect(createDataFrame(iris), stringsAsFactors = TRUE)) + expect_equal(class(iris$Species), class(df$Species)) + expect_equal(iris$Species, df$Species) +}) + test_that("SPARK-17811: can create DataFrame containing NA as date and time", { df <- data.frame( id = 1:2, - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17902][R] Revive stringsAsFactors option for collect() in SparkR
Repository: spark Updated Branches: refs/heads/master 3073344a2 -> a83d8d5ad [SPARK-17902][R] Revive stringsAsFactors option for collect() in SparkR ## What changes were proposed in this pull request? This PR proposes to revive `stringsAsFactors` option in collect API, which was mistakenly removed in https://github.com/apache/spark/commit/71a138cd0e0a14e8426f97877e3b52a562bbd02c. Simply, it casts `charactor` to `factor` if it meets the condition, `stringsAsFactors && is.character(vec)` in primitive type conversion. ## How was this patch tested? Unit test in `R/pkg/tests/fulltests/test_sparkSQL.R`. Author: hyukjinkwonCloses #19551 from HyukjinKwon/SPARK-17902. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a83d8d5a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a83d8d5a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a83d8d5a Branch: refs/heads/master Commit: a83d8d5adcb4e0061e43105767242ba9770dda96 Parents: 3073344 Author: hyukjinkwon Authored: Thu Oct 26 20:54:36 2017 +0900 Committer: hyukjinkwon Committed: Thu Oct 26 20:54:36 2017 +0900 -- R/pkg/R/DataFrame.R | 3 +++ R/pkg/tests/fulltests/test_sparkSQL.R | 6 ++ 2 files changed, 9 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a83d8d5a/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 176bb3b..aaa3349 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1191,6 +1191,9 @@ setMethod("collect", vec <- do.call(c, col) stopifnot(class(vec) != "list") class(vec) <- PRIMITIVE_TYPES[[colType]] +if (is.character(vec) && stringsAsFactors) { + vec <- as.factor(vec) +} df[[colIndex]] <- vec } else { df[[colIndex]] <- col http://git-wip-us.apache.org/repos/asf/spark/blob/a83d8d5a/R/pkg/tests/fulltests/test_sparkSQL.R -- diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 4382ef2..0c8118a 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -499,6 +499,12 @@ test_that("create DataFrame with different data types", { expect_equal(collect(df), data.frame(l, stringsAsFactors = FALSE)) }) +test_that("SPARK-17902: collect() with stringsAsFactors enabled", { + df <- suppressWarnings(collect(createDataFrame(iris), stringsAsFactors = TRUE)) + expect_equal(class(iris$Species), class(df$Species)) + expect_equal(iris$Species, df$Species) +}) + test_that("SPARK-17811: can create DataFrame containing NA as date and time", { df <- data.frame( id = 1:2, - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17902][R] Revive stringsAsFactors option for collect() in SparkR
Repository: spark Updated Branches: refs/heads/branch-2.2 d2dc175a1 -> 24fe7ccba [SPARK-17902][R] Revive stringsAsFactors option for collect() in SparkR ## What changes were proposed in this pull request? This PR proposes to revive `stringsAsFactors` option in collect API, which was mistakenly removed in https://github.com/apache/spark/commit/71a138cd0e0a14e8426f97877e3b52a562bbd02c. Simply, it casts `charactor` to `factor` if it meets the condition, `stringsAsFactors && is.character(vec)` in primitive type conversion. ## How was this patch tested? Unit test in `R/pkg/tests/fulltests/test_sparkSQL.R`. Author: hyukjinkwonCloses #19551 from HyukjinKwon/SPARK-17902. (cherry picked from commit a83d8d5adcb4e0061e43105767242ba9770dda96) Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24fe7ccb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24fe7ccb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24fe7ccb Branch: refs/heads/branch-2.2 Commit: 24fe7ccbacd913c19fa40199fd5511aaf55c6bfa Parents: d2dc175 Author: hyukjinkwon Authored: Thu Oct 26 20:54:36 2017 +0900 Committer: hyukjinkwon Committed: Thu Oct 26 20:55:00 2017 +0900 -- R/pkg/R/DataFrame.R | 3 +++ R/pkg/tests/fulltests/test_sparkSQL.R | 6 ++ 2 files changed, 9 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/24fe7ccb/R/pkg/R/DataFrame.R -- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 3859fa8..c0a954d 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1174,6 +1174,9 @@ setMethod("collect", vec <- do.call(c, col) stopifnot(class(vec) != "list") class(vec) <- PRIMITIVE_TYPES[[colType]] +if (is.character(vec) && stringsAsFactors) { + vec <- as.factor(vec) +} df[[colIndex]] <- vec } else { df[[colIndex]] <- col http://git-wip-us.apache.org/repos/asf/spark/blob/24fe7ccb/R/pkg/tests/fulltests/test_sparkSQL.R -- diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 12d8fef..50c60fe 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -483,6 +483,12 @@ test_that("create DataFrame with different data types", { expect_equal(collect(df), data.frame(l, stringsAsFactors = FALSE)) }) +test_that("SPARK-17902: collect() with stringsAsFactors enabled", { + df <- suppressWarnings(collect(createDataFrame(iris), stringsAsFactors = TRUE)) + expect_equal(class(iris$Species), class(df$Species)) + expect_equal(iris$Species, df$Species) +}) + test_that("SPARK-17811: can create DataFrame containing NA as date and time", { df <- data.frame( id = 1:2, - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21840][CORE] Add trait that allows conf to be directly set in application.
Repository: spark Updated Branches: refs/heads/master 592cfeab9 -> 3073344a2 [SPARK-21840][CORE] Add trait that allows conf to be directly set in application. Currently SparkSubmit uses system properties to propagate configuration to applications. This makes it hard to implement features such as SPARK-11035, which would allow multiple applications to be started in the same JVM. The current code would cause the config data from multiple apps to get mixed up. This change introduces a new trait, currently internal to Spark, that allows the app configuration to be passed directly to the application, without having to use system properties. The current "call main() method" behavior is maintained as an implementation of this new trait. This will be useful to allow multiple cluster mode apps to be submitted from the same JVM. As part of this, SparkSubmit was modified to collect all configuration directly into a SparkConf instance. Most of the changes are to tests so they use SparkConf instead of an opaque map. Tested with existing and added unit tests. Author: Marcelo VanzinCloses #19519 from vanzin/SPARK-21840. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3073344a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3073344a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3073344a Branch: refs/heads/master Commit: 3073344a2551fb198d63f2114a519ab97904cb55 Parents: 592cfea Author: Marcelo Vanzin Authored: Thu Oct 26 15:50:27 2017 +0800 Committer: jerryshao Committed: Thu Oct 26 15:50:27 2017 +0800 -- .../apache/spark/deploy/SparkApplication.scala | 55 + .../org/apache/spark/deploy/SparkSubmit.scala | 160 +++--- .../apache/spark/deploy/SparkSubmitSuite.scala | 213 +++ .../deploy/rest/StandaloneRestSubmitSuite.scala | 4 +- 4 files changed, 257 insertions(+), 175 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3073344a/core/src/main/scala/org/apache/spark/deploy/SparkApplication.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkApplication.scala b/core/src/main/scala/org/apache/spark/deploy/SparkApplication.scala new file mode 100644 index 000..118b460 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/SparkApplication.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.lang.reflect.Modifier + +import org.apache.spark.SparkConf + +/** + * Entry point for a Spark application. Implementations must provide a no-argument constructor. + */ +private[spark] trait SparkApplication { + + def start(args: Array[String], conf: SparkConf): Unit + +} + +/** + * Implementation of SparkApplication that wraps a standard Java class with a "main" method. + * + * Configuration is propagated to the application via system properties, so running multiple + * of these in the same JVM may lead to undefined behavior due to configuration leaks. + */ +private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication { + + override def start(args: Array[String], conf: SparkConf): Unit = { +val mainMethod = klass.getMethod("main", new Array[String](0).getClass) +if (!Modifier.isStatic(mainMethod.getModifiers)) { + throw new IllegalStateException("The main method in the given main class must be static") +} + +val sysProps = conf.getAll.toMap +sysProps.foreach { case (k, v) => + sys.props(k) = v +} + +mainMethod.invoke(null, args) + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/3073344a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index b7e6d0e..73b956e 100644 ---
spark git commit: [SPARK-22308] Support alternative unit testing styles in external applications
Repository: spark Updated Branches: refs/heads/master 5433be44c -> 592cfeab9 [SPARK-22308] Support alternative unit testing styles in external applications ## What changes were proposed in this pull request? Support unit tests of external code (i.e., applications that use spark) using scalatest that don't want to use FunSuite. SharedSparkContext already supports this, but SharedSQLContext does not. I've introduced SharedSparkSession as a parent to SharedSQLContext, written in a way that it does support all scalatest styles. ## How was this patch tested? There are three new unit test suites added that just test using FunSpec, FlatSpec, and WordSpec. Author: Nathan KronenfeldCloses #19529 from nkronenfeld/alternative-style-tests-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/592cfeab Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/592cfeab Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/592cfeab Branch: refs/heads/master Commit: 592cfeab9caeff955d115a1ca5014ede7d402907 Parents: 5433be4 Author: Nathan Kronenfeld Authored: Thu Oct 26 00:29:49 2017 -0700 Committer: gatorsmile Committed: Thu Oct 26 00:29:49 2017 -0700 -- .../org/apache/spark/SharedSparkContext.scala | 17 +- .../spark/sql/catalyst/plans/PlanTest.scala | 10 +- .../spark/sql/test/GenericFlatSpecSuite.scala | 45 + .../spark/sql/test/GenericFunSpecSuite.scala| 47 + .../spark/sql/test/GenericWordSpecSuite.scala | 51 ++ .../apache/spark/sql/test/SQLTestUtils.scala| 173 ++- .../spark/sql/test/SharedSQLContext.scala | 84 + .../spark/sql/test/SharedSparkSession.scala | 119 + 8 files changed, 381 insertions(+), 165 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/592cfeab/core/src/test/scala/org/apache/spark/SharedSparkContext.scala -- diff --git a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala index 6aedcb1..1aa1c42 100644 --- a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala +++ b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala @@ -29,10 +29,23 @@ trait SharedSparkContext extends BeforeAndAfterAll with BeforeAndAfterEach { sel var conf = new SparkConf(false) + /** + * Initialize the [[SparkContext]]. Generally, this is just called from beforeAll; however, in + * test using styles other than FunSuite, there is often code that relies on the session between + * test group constructs and the actual tests, which may need this session. It is purely a + * semantic difference, but semantically, it makes more sense to call 'initializeContext' between + * a 'describe' and an 'it' call than it does to call 'beforeAll'. + */ + protected def initializeContext(): Unit = { +if (null == _sc) { + _sc = new SparkContext( +"local[4]", "test", conf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)) +} + } + override def beforeAll() { super.beforeAll() -_sc = new SparkContext( - "local[4]", "test", conf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)) +initializeContext() } override def afterAll() { http://git-wip-us.apache.org/repos/asf/spark/blob/592cfeab/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala index 10bdfaf..82c5307 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.plans +import org.scalatest.Suite + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer @@ -29,7 +31,13 @@ import org.apache.spark.sql.internal.SQLConf /** * Provides helper methods for comparing plans. */ -trait PlanTest extends SparkFunSuite with PredicateHelper { +trait PlanTest extends SparkFunSuite with PlanTestBase + +/** + * Provides helper methods for comparing plans, but without the overhead of + * mandating a FunSuite. + */ +trait PlanTestBase extends PredicateHelper { self: Suite => // TODO(gatorsmile): remove this from PlanTest and all the analyzer rules protected def conf = SQLConf.get