svn commit: r26334 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_13_20_01-73f2853-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Sat Apr 14 03:15:33 2018 New Revision: 26334 Log: Apache Spark 2.4.0-SNAPSHOT-2018_04_13_20_01-73f2853 docs [This commit notification would consist of 1458 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23979][SQL] MultiAlias should not be a CodegenFallback
Repository: spark Updated Branches: refs/heads/master cbb41a0c5 -> 73f28530d [SPARK-23979][SQL] MultiAlias should not be a CodegenFallback ## What changes were proposed in this pull request? Just found `MultiAlias` is a `CodegenFallback`. It should not be as looks like `MultiAlias` won't be evaluated. ## How was this patch tested? Existing tests. Author: Liang-Chi Hsieh Closes #21065 from viirya/multialias-without-codegenfallback. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73f28530 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73f28530 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73f28530 Branch: refs/heads/master Commit: 73f28530d6f6dd8aba758ea818c456cf911a5f41 Parents: cbb41a0 Author: Liang-Chi Hsieh Authored: Sat Apr 14 08:59:04 2018 +0800 Committer: Wenchen Fan Committed: Sat Apr 14 08:59:04 2018 +0800 -- .../org/apache/spark/sql/catalyst/analysis/unresolved.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/73f28530/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index a65f58f..71e2317 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodegenFallback, ExprCode} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.parser.ParserUtils import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode} import org.apache.spark.sql.catalyst.trees.TreeNode @@ -335,7 +335,7 @@ case class UnresolvedRegex(regexPattern: String, table: Option[String], caseSens * @param names the names to be associated with each output of computing [[child]]. */ case class MultiAlias(child: Expression, names: Seq[String]) - extends UnaryExpression with NamedExpression with CodegenFallback { + extends UnaryExpression with NamedExpression with Unevaluable { override def name: String = throw new UnresolvedException(this, "name") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23966][SS] Refactoring all checkpoint file writing logic in a common CheckpointFileManager interface
Repository: spark Updated Branches: refs/heads/master 558f31b31 -> cbb41a0c5 [SPARK-23966][SS] Refactoring all checkpoint file writing logic in a common CheckpointFileManager interface ## What changes were proposed in this pull request? Checkpoint files (offset log files, state store files) in Structured Streaming must be written atomically such that no partial files are generated (would break fault-tolerance guarantees). Currently, there are 3 locations which try to do this individually, and in some cases, incorrectly. 1. HDFSOffsetMetadataLog - This uses a FileManager interface to use any implementation of `FileSystem` or `FileContext` APIs. It preferably loads `FileContext` implementation as FileContext of HDFS has atomic renames. 1. HDFSBackedStateStore (aka in-memory state store) - Writing a version.delta file - This uses FileSystem APIs only to perform a rename. This is incorrect as rename is not atomic in HDFS FileSystem implementation. - Writing a snapshot file - Same as above. Current problems: 1. State Store behavior is incorrect - HDFS FileSystem implementation does not have atomic rename. 1. Inflexible - Some file systems provide mechanisms other than write-to-temp-file-and-rename for writing atomically and more efficiently. For example, with S3 you can write directly to the final file and it will be made visible only when the entire file is written and closed correctly. Any failure can be made to terminate the writing without making any partial files visible in S3. The current code does not abstract out this mechanism enough that it can be customized. Solution: 1. Introduce a common interface that all 3 cases above can use to write checkpoint files atomically. 2. This interface must provide the necessary interfaces that allow customization of the write-and-rename mechanism. This PR does that by introducing the interface `CheckpointFileManager` and modifying `HDFSMetadataLog` and `HDFSBackedStateStore` to use the interface. Similar to earlier `FileManager`, there are implementations based on `FileSystem` and `FileContext` APIs, and the latter implementation is preferred to make it work correctly with HDFS. The key method this interface has is `createAtomic(path, overwrite)` which returns a `CancellableFSDataOutputStream` that has the method `cancel()`. All users of this method need to either call `close()` to successfully write the file, or `cancel()` in case of an error. ## How was this patch tested? New tests in `CheckpointFileManagerSuite` and slightly modified existing tests. Author: Tathagata Das Closes #21048 from tdas/SPARK-23966. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cbb41a0c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cbb41a0c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cbb41a0c Branch: refs/heads/master Commit: cbb41a0c5b01579c85f06ef42cc0585fbef216c5 Parents: 558f31b Author: Tathagata Das Authored: Fri Apr 13 16:31:39 2018 -0700 Committer: Tathagata Das Committed: Fri Apr 13 16:31:39 2018 -0700 -- .../org/apache/spark/sql/internal/SQLConf.scala | 7 + .../streaming/CheckpointFileManager.scala | 349 +++ .../execution/streaming/HDFSMetadataLog.scala | 229 +--- .../state/HDFSBackedStateStoreProvider.scala| 120 +++ .../execution/streaming/state/StateStore.scala | 4 +- .../streaming/CheckpointFileManagerSuite.scala | 192 ++ .../CompactibleFileStreamLogSuite.scala | 5 - .../streaming/HDFSMetadataLogSuite.scala| 116 +- .../streaming/state/StateStoreSuite.scala | 58 ++- 9 files changed, 678 insertions(+), 402 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cbb41a0c/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1c8ab9c..0dc47bf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -930,6 +930,13 @@ object SQLConf { .intConf .createWithDefault(100) + val STREAMING_CHECKPOINT_FILE_MANAGER_CLASS = +buildConf("spark.sql.streaming.checkpointFileManagerClass") + .doc("The class used to write checkpoint files atomically. This class must be a subclass " + +"of the interface CheckpointFileManager.") + .internal() + .stringConf + val NDV_MAX_ERROR = buildConf("spark.sql.statistics.ndv.maxError") .internal() http://git-wip-us.apache.org/repos/asf/spark/blob/cbb41a0c/sql
svn commit: r26333 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_13_16_01-558f31b-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Apr 13 23:15:51 2018 New Revision: 26333 Log: Apache Spark 2.4.0-SNAPSHOT-2018_04_13_16_01-558f31b docs [This commit notification would consist of 1458 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23963][SQL] Properly handle large number of columns in query on text-based Hive table
Repository: spark Updated Branches: refs/heads/master 25892f3cc -> 558f31b31 [SPARK-23963][SQL] Properly handle large number of columns in query on text-based Hive table ## What changes were proposed in this pull request? TableReader would get disproportionately slower as the number of columns in the query increased. I fixed the way TableReader was looking up metadata for each column in the row. Previously, it had been looking up this data in linked lists, accessing each linked list by an index (column number). Now it looks up this data in arrays, where indexing by column number works better. ## How was this patch tested? Manual testing All sbt unit tests python sql tests Author: Bruce Robbins Closes #21043 from bersprockets/tabreadfix. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/558f31b3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/558f31b3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/558f31b3 Branch: refs/heads/master Commit: 558f31b31c73b7e9f26f56498b54cf53997b59b8 Parents: 25892f3 Author: Bruce Robbins Authored: Fri Apr 13 14:05:04 2018 -0700 Committer: gatorsmile Committed: Fri Apr 13 14:05:04 2018 -0700 -- .../src/main/scala/org/apache/spark/sql/hive/TableReader.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/558f31b3/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index cc8907a..b5444a4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -381,7 +381,7 @@ private[hive] object HadoopTableReader extends HiveInspectors with Logging { val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { case (attr, ordinal) => soi.getStructFieldRef(attr.name) -> ordinal -}.unzip +}.toArray.unzip /** * Builds specific unwrappers ahead of time according to object inspector - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark-website git commit: Update committer page
Repository: spark-website Updated Branches: refs/heads/asf-site 658467248 -> 69b595481 Update committer page Author: DB Tsai Closes #113 from dbtsai/changeAffiliation. Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/69b59548 Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/69b59548 Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/69b59548 Branch: refs/heads/asf-site Commit: 69b595481c6b6866a66f1e7a45265246eb36b2c1 Parents: 6584672 Author: DB Tsai Authored: Fri Apr 13 13:50:48 2018 -0700 Committer: DB Tsai Committed: Fri Apr 13 13:50:48 2018 -0700 -- committers.md| 2 +- site/committers.html | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark-website/blob/69b59548/committers.md -- diff --git a/committers.md b/committers.md index f538476..eef9f2c 100644 --- a/committers.md +++ b/committers.md @@ -60,7 +60,7 @@ navigation: |Saisai Shao|Hortonworks| |Prashant Sharma|IBM| |Ram Sriharsha|Databricks| -|DB Tsai|Netflix| +|DB Tsai|Apple| |Takuya Ueshin|Databricks| |Marcelo Vanzin|Cloudera| |Shivaram Venkataraman|University of Wisconsin, Madison| http://git-wip-us.apache.org/repos/asf/spark-website/blob/69b59548/site/committers.html -- diff --git a/site/committers.html b/site/committers.html index 014beae..c5b40e9 100644 --- a/site/committers.html +++ b/site/committers.html @@ -412,7 +412,7 @@ DB Tsai - Netflix + Apple Takuya Ueshin - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r26330 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_13_12_01-25892f3-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Apr 13 19:16:00 2018 New Revision: 26330 Log: Apache Spark 2.4.0-SNAPSHOT-2018_04_13_12_01-25892f3 docs [This commit notification would consist of 1458 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23375][SQL] Eliminate unneeded Sort in Optimizer
Repository: spark Updated Branches: refs/heads/master 4dfd746de -> 25892f3cc [SPARK-23375][SQL] Eliminate unneeded Sort in Optimizer ## What changes were proposed in this pull request? Added a new rule to remove Sort operation when its child is already sorted. For instance, this simple code: ``` spark.sparkContext.parallelize(Seq(("a", "b"))).toDF("a", "b").registerTempTable("table1") val df = sql(s"""SELECT b | FROM ( | SELECT a, b | FROM table1 | ORDER BY a | ) t | ORDER BY a""".stripMargin) df.explain(true) ``` before the PR produces this plan: ``` == Parsed Logical Plan == 'Sort ['a ASC NULLS FIRST], true +- 'Project ['b] +- 'SubqueryAlias t +- 'Sort ['a ASC NULLS FIRST], true +- 'Project ['a, 'b] +- 'UnresolvedRelation `table1` == Analyzed Logical Plan == b: string Project [b#7] +- Sort [a#6 ASC NULLS FIRST], true +- Project [b#7, a#6] +- SubqueryAlias t +- Sort [a#6 ASC NULLS FIRST], true +- Project [a#6, b#7] +- SubqueryAlias table1 +- Project [_1#3 AS a#6, _2#4 AS b#7] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4] +- ExternalRDD [obj#2] == Optimized Logical Plan == Project [b#7] +- Sort [a#6 ASC NULLS FIRST], true +- Project [b#7, a#6] +- Sort [a#6 ASC NULLS FIRST], true +- Project [_1#3 AS a#6, _2#4 AS b#7] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4] +- ExternalRDD [obj#2] == Physical Plan == *(3) Project [b#7] +- *(3) Sort [a#6 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(a#6 ASC NULLS FIRST, 200) +- *(2) Project [b#7, a#6] +- *(2) Sort [a#6 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(a#6 ASC NULLS FIRST, 200) +- *(1) Project [_1#3 AS a#6, _2#4 AS b#7] +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4] +- Scan ExternalRDDScan[obj#2] ``` while after the PR produces: ``` == Parsed Logical Plan == 'Sort ['a ASC NULLS FIRST], true +- 'Project ['b] +- 'SubqueryAlias t +- 'Sort ['a ASC NULLS FIRST], true +- 'Project ['a, 'b] +- 'UnresolvedRelation `table1` == Analyzed Logical Plan == b: string Project [b#7] +- Sort [a#6 ASC NULLS FIRST], true +- Project [b#7, a#6] +- SubqueryAlias t +- Sort [a#6 ASC NULLS FIRST], true +- Project [a#6, b#7] +- SubqueryAlias table1 +- Project [_1#3 AS a#6, _2#4 AS b#7] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2, true, false) AS _2#4] +- ExternalRDD [obj#2] == Optimized Logical Plan == Project [b#7] +- Sort [a#6 ASC NULLS FIRST], true +- Project [_1#3 AS a#6, _2#4 AS b#7] +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._2, true, false) AS _2#4] +- ExternalRDD [obj#2] == Physical Plan == *(2) Project [b#7] +- *(2) Sort [a#6 ASC NULLS FIRST], true, 0 +- Exchange rangepartitioning(a#6 ASC NULLS FIRST, 5) +- *(1) Project [_1#3 AS a#6, _2#4 AS b#7] +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#3, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2
spark git commit: [SPARK-23896][SQL] Improve PartitioningAwareFileIndex
Repository: spark Updated Branches: refs/heads/master a83ae0d9b -> 4dfd746de [SPARK-23896][SQL] Improve PartitioningAwareFileIndex ## What changes were proposed in this pull request? Currently `PartitioningAwareFileIndex` accepts an optional parameter `userPartitionSchema`. If provided, it will combine the inferred partition schema with the parameter. However, 1. to get `userPartitionSchema`, we need to combine inferred partition schema with `userSpecifiedSchema` 2. to get the inferred partition schema, we have to create a temporary file index. Only after that, a final version of `PartitioningAwareFileIndex` can be created. This can be improved by passing `userSpecifiedSchema` to `PartitioningAwareFileIndex`. With the improvement, we can reduce redundant code and avoid parsing the file partition twice. ## How was this patch tested? Unit test Author: Gengliang Wang Closes #21004 from gengliangwang/PartitioningAwareFileIndex. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4dfd746d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4dfd746d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4dfd746d Branch: refs/heads/master Commit: 4dfd746de3f4346ed0c2191f8523a7e6cc9f064d Parents: a83ae0d Author: Gengliang Wang Authored: Sat Apr 14 00:22:38 2018 +0800 Committer: Wenchen Fan Committed: Sat Apr 14 00:22:38 2018 +0800 -- .../datasources/CatalogFileIndex.scala | 2 +- .../sql/execution/datasources/DataSource.scala | 133 --- .../datasources/InMemoryFileIndex.scala | 8 +- .../PartitioningAwareFileIndex.scala| 54 +--- .../streaming/MetadataLogFileIndex.scala| 10 +- .../datasources/FileSourceStrategySuite.scala | 2 +- .../hive/PartitionedTablePerfStatsSuite.scala | 2 +- 7 files changed, 103 insertions(+), 108 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4dfd746d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index 4046396..a66a076 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -85,7 +85,7 @@ class CatalogFileIndex( sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec, Option(timeNs)) } else { new InMemoryFileIndex( -sparkSession, rootPaths, table.storage.properties, partitionSchema = None) +sparkSession, rootPaths, table.storage.properties, userSpecifiedSchema = None) } } http://git-wip-us.apache.org/repos/asf/spark/blob/4dfd746d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index b84ea76..f16d824 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -23,7 +23,6 @@ import scala.collection.JavaConverters._ import scala.language.{existentials, implicitConversions} import scala.util.{Failure, Success, Try} -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil @@ -104,24 +103,6 @@ case class DataSource( } /** - * In the read path, only managed tables by Hive provide the partition columns properly when - * initializing this class. All other file based data sources will try to infer the partitioning, - * and then cast the inferred types to user specified dataTypes if the partition columns exist - * inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510, or - * inconsistent data types as reported in SPARK-21463. - * @param fileIndex A FileIndex that will perform partition inference - * @return The PartitionSchema resolved from inference and cast according to `userSpecifiedSchema` - */ - private def combineInferredAndUserSpecifiedPartitionSchema(fileIndex: FileIndex): StructType = { -val resolved = fileIndex.partitionSchema.map { partitionField => - // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred - userSpecifiedSchema.flatMap(_.find
[1/3] spark git commit: [SPARK-22839][K8S] Refactor to unify driver and executor pod builder APIs
Repository: spark Updated Branches: refs/heads/master 0323e6146 -> a83ae0d9b http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala -- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala new file mode 100644 index 000..9d02f56 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import io.fabric8.kubernetes.api.model.PodBuilder + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SecretVolumeUtils, SparkPod} + +class MountSecretsFeatureStepSuite extends SparkFunSuite { + + private val SECRET_FOO = "foo" + private val SECRET_BAR = "bar" + private val SECRET_MOUNT_PATH = "/etc/secrets/driver" + + test("mounts all given secrets") { +val baseDriverPod = SparkPod.initialPod() +val secretNamesToMountPaths = Map( + SECRET_FOO -> SECRET_MOUNT_PATH, + SECRET_BAR -> SECRET_MOUNT_PATH) +val sparkConf = new SparkConf(false) +val kubernetesConf = KubernetesConf( + sparkConf, + KubernetesExecutorSpecificConf("1", new PodBuilder().build()), + "resource-name-prefix", + "app-id", + Map.empty, + Map.empty, + secretNamesToMountPaths, + Map.empty) + +val step = new MountSecretsFeatureStep(kubernetesConf) +val driverPodWithSecretsMounted = step.configurePod(baseDriverPod).pod +val driverContainerWithSecretsMounted = step.configurePod(baseDriverPod).container + +Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach { volumeName => + assert(SecretVolumeUtils.podHasVolume(driverPodWithSecretsMounted, volumeName)) +} +Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach { volumeName => + assert(SecretVolumeUtils.containerHasVolume( +driverContainerWithSecretsMounted, volumeName, SECRET_MOUNT_PATH)) +} + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala -- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index 6a50159..c1b203e 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -16,22 +16,17 @@ */ package org.apache.spark.deploy.k8s.submit -import scala.collection.JavaConverters._ - -import com.google.common.collect.Iterables import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{KubernetesClient, Watch} import io.fabric8.kubernetes.client.dsl.{MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource} import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} import org.mockito.Mockito.{doReturn, verify, when} -import org.mockito.invocation.InvocationOnMock -import org.mockito.stubbing.Answer import org.scalatest.BeforeAndAfter import org.scalatest.mockito.MockitoSugar._ import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpec, KubernetesDriverSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep class ClientSuite extends SparkFunSuite with BeforeAndAfter { @@ -39,6 +34,74 @@ class ClientSuite extends SparkFu
[2/3] spark git commit: [SPARK-22839][K8S] Refactor to unify driver and executor pod builder APIs
http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala -- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala deleted file mode 100644 index 43de329..000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStep.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.submit.steps - -import java.io.File - -import io.fabric8.kubernetes.api.model.ContainerBuilder - -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.KubernetesUtils -import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec - -/** - * Step that configures the classpath, spark.jars, and spark.files for the driver given that the - * user may provide remote files or files with local:// schemes. - */ -private[spark] class DependencyResolutionStep( -sparkJars: Seq[String], -sparkFiles: Seq[String]) extends DriverConfigurationStep { - - override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { -val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath(sparkJars) -val resolvedSparkFiles = KubernetesUtils.resolveFileUrisAndPath(sparkFiles) - -val sparkConf = driverSpec.driverSparkConf.clone() -if (resolvedSparkJars.nonEmpty) { - sparkConf.set("spark.jars", resolvedSparkJars.mkString(",")) -} -if (resolvedSparkFiles.nonEmpty) { - sparkConf.set("spark.files", resolvedSparkFiles.mkString(",")) -} -val resolvedDriverContainer = if (resolvedSparkJars.nonEmpty) { - new ContainerBuilder(driverSpec.driverContainer) -.addNewEnv() - .withName(ENV_MOUNTED_CLASSPATH) - .withValue(resolvedSparkJars.mkString(File.pathSeparator)) - .endEnv() -.build() -} else { - driverSpec.driverContainer -} - -driverSpec.copy( - driverContainer = resolvedDriverContainer, - driverSparkConf = sparkConf) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverConfigurationStep.scala -- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverConfigurationStep.scala deleted file mode 100644 index 17614e0..000 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverConfigurationStep.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.submit.steps - -import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec - -/** - * Represents a step in configuring the Spark driver pod. - */ -private[spark] trait DriverConfigurationStep { - - /** - * Apply some transformation to the previous state o
[3/3] spark git commit: [SPARK-22839][K8S] Refactor to unify driver and executor pod builder APIs
[SPARK-22839][K8S] Refactor to unify driver and executor pod builder APIs ## What changes were proposed in this pull request? Breaks down the construction of driver pods and executor pods in a way that uses a common abstraction for both spark-submit creating the driver and KubernetesClusterSchedulerBackend creating the executor. Encourages more code reuse and is more legible than the older approach. The high-level design is discussed in more detail on the JIRA ticket. This pull request is the implementation of that design with some minor changes in the implementation details. No user-facing behavior should break as a result of this change. ## How was this patch tested? Migrated all unit tests from the old submission steps architecture to the new architecture. Integration tests should not have to change and pass given that this shouldn't change any outward behavior. Author: mcheah Closes #20910 from mccheah/spark-22839-incremental. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a83ae0d9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a83ae0d9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a83ae0d9 Branch: refs/heads/master Commit: a83ae0d9bc1b8f4909b9338370efe4020079bea7 Parents: 0323e61 Author: mcheah Authored: Fri Apr 13 08:43:58 2018 -0700 Committer: Anirudh Ramanathan Committed: Fri Apr 13 08:43:58 2018 -0700 -- .../org/apache/spark/deploy/k8s/Config.scala| 2 +- .../spark/deploy/k8s/KubernetesConf.scala | 184 ++ .../spark/deploy/k8s/KubernetesDriverSpec.scala | 31 +++ .../spark/deploy/k8s/KubernetesUtils.scala | 11 - .../deploy/k8s/MountSecretsBootstrap.scala | 72 -- .../org/apache/spark/deploy/k8s/SparkPod.scala | 34 +++ .../k8s/features/BasicDriverFeatureStep.scala | 136 ++ .../k8s/features/BasicExecutorFeatureStep.scala | 179 ++ ...DriverKubernetesCredentialsFeatureStep.scala | 216 .../k8s/features/DriverServiceFeatureStep.scala | 97 .../features/KubernetesFeatureConfigStep.scala | 71 ++ .../k8s/features/MountSecretsFeatureStep.scala | 62 + .../k8s/submit/DriverConfigOrchestrator.scala | 145 --- .../submit/KubernetesClientApplication.scala| 80 +++--- .../k8s/submit/KubernetesDriverBuilder.scala| 56 + .../k8s/submit/KubernetesDriverSpec.scala | 47 .../steps/BasicDriverConfigurationStep.scala| 163 .../submit/steps/DependencyResolutionStep.scala | 61 - .../submit/steps/DriverConfigurationStep.scala | 30 --- .../steps/DriverKubernetesCredentialsStep.scala | 245 --- .../submit/steps/DriverMountSecretsStep.scala | 38 --- .../steps/DriverServiceBootstrapStep.scala | 104 .../cluster/k8s/ExecutorPodFactory.scala| 227 - .../cluster/k8s/KubernetesClusterManager.scala | 12 +- .../k8s/KubernetesClusterSchedulerBackend.scala | 20 +- .../cluster/k8s/KubernetesExecutorBuilder.scala | 41 .../spark/deploy/k8s/KubernetesConfSuite.scala | 175 + .../spark/deploy/k8s/KubernetesUtilsTest.scala | 36 --- .../features/BasicDriverFeatureStepSuite.scala | 153 .../BasicExecutorFeatureStepSuite.scala | 179 ++ ...rKubernetesCredentialsFeatureStepSuite.scala | 174 + .../DriverServiceFeatureStepSuite.scala | 227 + .../features/KubernetesFeaturesTestUtils.scala | 61 + .../features/MountSecretsFeatureStepSuite.scala | 58 + .../spark/deploy/k8s/submit/ClientSuite.scala | 216 .../submit/DriverConfigOrchestratorSuite.scala | 131 -- .../submit/KubernetesDriverBuilderSuite.scala | 102 .../BasicDriverConfigurationStepSuite.scala | 122 - .../steps/DependencyResolutionStepSuite.scala | 69 -- .../DriverKubernetesCredentialsStepSuite.scala | 153 .../steps/DriverMountSecretsStepSuite.scala | 49 .../steps/DriverServiceBootstrapStepSuite.scala | 180 -- .../cluster/k8s/ExecutorPodFactorySuite.scala | 195 --- ...KubernetesClusterSchedulerBackendSuite.scala | 37 +-- .../k8s/KubernetesExecutorBuilderSuite.scala| 75 ++ 45 files changed, 2482 insertions(+), 2274 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a83ae0d9/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala -- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 82f6c71..4086970 100644 --- a/reso
svn commit: r26320 - in /dev/spark/2.3.1-SNAPSHOT-2018_04_13_02_01-dfdf1bb-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Apr 13 09:15:58 2018 New Revision: 26320 Log: Apache Spark 2.3.1-SNAPSHOT-2018_04_13_02_01-dfdf1bb docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r26319 - in /dev/spark/2.4.0-SNAPSHOT-2018_04_13_00_01-0323e61-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Apr 13 07:17:57 2018 New Revision: 26319 Log: Apache Spark 2.4.0-SNAPSHOT-2018_04_13_00_01-0323e61 docs [This commit notification would consist of 1458 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23905][SQL] Add UDF weekday
Repository: spark Updated Branches: refs/heads/master 4b0703679 -> 0323e6146 [SPARK-23905][SQL] Add UDF weekday ## What changes were proposed in this pull request? Add UDF weekday ## How was this patch tested? A new test Author: yucai Closes #21009 from yucai/SPARK-23905. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0323e614 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0323e614 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0323e614 Branch: refs/heads/master Commit: 0323e61465ee747c9a57a70e9d6108876499546e Parents: 4b07036 Author: yucai Authored: Fri Apr 13 00:00:04 2018 -0700 Committer: gatorsmile Committed: Fri Apr 13 00:00:04 2018 -0700 -- .../catalyst/analysis/FunctionRegistry.scala| 1 + .../expressions/datetimeExpressions.scala | 55 .../expressions/DateExpressionsSuite.scala | 11 .../resources/sql-tests/inputs/datetime.sql | 2 + .../sql-tests/results/datetime.sql.out | 9 +++- 5 files changed, 67 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0323e614/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 747016b..131b958 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -395,6 +395,7 @@ object FunctionRegistry { expression[TruncTimestamp]("date_trunc"), expression[UnixTimestamp]("unix_timestamp"), expression[DayOfWeek]("dayofweek"), +expression[WeekDay]("weekday"), expression[WeekOfYear]("weekofyear"), expression[Year]("year"), expression[TimeWindow]("window"), http://git-wip-us.apache.org/repos/asf/spark/blob/0323e614/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 32fdb13..b9b2cd5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -426,36 +426,71 @@ case class DayOfMonth(child: Expression) extends UnaryExpression with ImplicitCa """, since = "2.3.0") // scalastyle:on line.size.limit -case class DayOfWeek(child: Expression) extends UnaryExpression with ImplicitCastInputTypes { +case class DayOfWeek(child: Expression) extends DayWeek { - override def inputTypes: Seq[AbstractDataType] = Seq(DateType) - - override def dataType: DataType = IntegerType + override protected def nullSafeEval(date: Any): Any = { +cal.setTimeInMillis(date.asInstanceOf[Int] * 1000L * 3600L * 24L) +cal.get(Calendar.DAY_OF_WEEK) + } - @transient private lazy val c = { -Calendar.getInstance(DateTimeUtils.getTimeZone("UTC")) + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, time => { + val cal = classOf[Calendar].getName + val dtu = DateTimeUtils.getClass.getName.stripSuffix("$") + val c = "calDayOfWeek" + ctx.addImmutableStateIfNotExists(cal, c, +v => s"""$v = $cal.getInstance($dtu.getTimeZone("UTC"));""") + s""" +$c.setTimeInMillis($time * 1000L * 3600L * 24L); +${ev.value} = $c.get($cal.DAY_OF_WEEK); + """ +}) } +} + +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(date) - Returns the day of the week for date/timestamp (0 = Monday, 1 = Tuesday, ..., 6 = Sunday).", + examples = """ +Examples: + > SELECT _FUNC_('2009-07-30'); + 3 + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class WeekDay(child: Expression) extends DayWeek { override protected def nullSafeEval(date: Any): Any = { -c.setTimeInMillis(date.asInstanceOf[Int] * 1000L * 3600L * 24L) -c.get(Calendar.DAY_OF_WEEK) +cal.setTimeInMillis(date.asInstanceOf[Int] * 1000L * 3600L * 24L) +(cal.get(Calendar.DAY_OF_WEEK) + 5 ) % 7 } override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { nullSafeCodeGen(ctx, ev, time => { val cal = classOf[Calendar].getName val dtu