spark git commit: [SPARK-23418][SQL] Fail DataSourceV2 reads when user schema is passed, but not supported.
Repository: spark Updated Branches: refs/heads/master 95e25ed1a -> c8c4441df [SPARK-23418][SQL] Fail DataSourceV2 reads when user schema is passed, but not supported. ## What changes were proposed in this pull request? DataSourceV2 initially allowed user-supplied schemas when a source doesn't implement `ReadSupportWithSchema`, as long as the schema was identical to the source's schema. This is confusing behavior because changes to an underlying table can cause a previously working job to fail with an exception that user-supplied schemas are not allowed. This reverts commit adcb25a0624, which was added to #20387 so that it could be removed in a separate JIRA issue and PR. ## How was this patch tested? Existing tests. Author: Ryan Blue Closes #20603 from rdblue/SPARK-23418-revert-adcb25a0624. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8c4441d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8c4441d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8c4441d Branch: refs/heads/master Commit: c8c4441dfdfeda22f8d92e25aee1b6a6269752f9 Parents: 95e25ed Author: Ryan Blue Authored: Wed Feb 21 15:10:08 2018 +0800 Committer: Wenchen Fan Committed: Wed Feb 21 15:10:08 2018 +0800 -- .../datasources/v2/DataSourceV2Relation.scala | 13 + 1 file changed, 1 insertion(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c8c4441d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index a98dd48..cc6cb63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -174,13 +174,6 @@ object DataSourceV2Relation { v2Options: DataSourceOptions, userSchema: Option[StructType]): StructType = { val reader = userSchema match { - // TODO: remove this case because it is confusing for users - case Some(s) if !source.isInstanceOf[ReadSupportWithSchema] => -val reader = source.asReadSupport.createReader(v2Options) -if (reader.readSchema() != s) { - throw new AnalysisException(s"${source.name} does not allow user-specified schemas.") -} -reader case Some(s) => source.asReadSupportWithSchema.createReader(s, v2Options) case _ => @@ -195,11 +188,7 @@ object DataSourceV2Relation { filters: Option[Seq[Expression]] = None, userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = { val projection = schema(source, makeV2Options(options), userSpecifiedSchema).toAttributes -DataSourceV2Relation(source, options, projection, filters, - // if the source does not implement ReadSupportWithSchema, then the userSpecifiedSchema must - // be equal to the reader's schema. the schema method enforces this. because the user schema - // and the reader's schema are identical, drop the user schema. - if (source.isInstanceOf[ReadSupportWithSchema]) userSpecifiedSchema else None) +DataSourceV2Relation(source, options, projection, filters, userSpecifiedSchema) } private def pushRequiredColumns(reader: DataSourceReader, struct: StructType): Unit = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r25179 - in /dev/spark/2.3.1-SNAPSHOT-2018_02_20_22_01-3e7269e-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Feb 21 06:15:33 2018 New Revision: 25179 Log: Apache Spark 2.3.1-SNAPSHOT-2018_02_20_22_01-3e7269e docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r25177 - in /dev/spark/2.4.0-SNAPSHOT-2018_02_20_20_01-95e25ed-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Feb 21 04:15:56 2018 New Revision: 25177 Log: Apache Spark 2.4.0-SNAPSHOT-2018_02_20_20_01-95e25ed docs [This commit notification would consist of 1444 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23424][SQL] Add codegenStageId in comment
Repository: spark Updated Branches: refs/heads/master 601d653bf -> 95e25ed1a [SPARK-23424][SQL] Add codegenStageId in comment ## What changes were proposed in this pull request? This PR always adds `codegenStageId` in comment of the generated class. This is a replication of #20419 for post-Spark 2.3. Closes #20419 ``` /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage1(references); /* 003 */ } /* 004 */ /* 005 */ // codegenStageId=1 /* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; ... ``` ## How was this patch tested? Existing tests Author: Kazuaki Ishizaki Closes #20612 from kiszk/SPARK-23424. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95e25ed1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95e25ed1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95e25ed1 Branch: refs/heads/master Commit: 95e25ed1a8b56937345eff637c0032aea85a503d Parents: 601d653 Author: Kazuaki Ishizaki Authored: Wed Feb 21 11:26:06 2018 +0800 Committer: Wenchen Fan Committed: Wed Feb 21 11:26:06 2018 +0800 -- .../expressions/codegen/CodeGenerator.scala | 21 +--- .../sql/execution/WholeStageCodegenExec.scala | 4 +++- 2 files changed, 21 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/95e25ed1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 31ba29a..60a6f50 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -1232,14 +1232,29 @@ class CodegenContext { /** * Register a comment and return the corresponding place holder + * + * @param placeholderId an optionally specified identifier for the comment's placeholder. + * The caller should make sure this identifier is unique within the + * compilation unit. If this argument is not specified, a fresh identifier + * will be automatically created and used as the placeholder. + * @param force whether to force registering the comments */ - def registerComment(text: => String): String = { + def registerComment( + text: => String, + placeholderId: String = "", + force: Boolean = false): String = { // By default, disable comments in generated code because computing the comments themselves can // be extremely expensive in certain cases, such as deeply-nested expressions which operate over // inputs with wide schemas. For more details on the performance issues that motivated this // flat, see SPARK-15680. -if (SparkEnv.get != null && SparkEnv.get.conf.getBoolean("spark.sql.codegen.comments", false)) { - val name = freshName("c") +if (force || + SparkEnv.get != null && SparkEnv.get.conf.getBoolean("spark.sql.codegen.comments", false)) { + val name = if (placeholderId != "") { +assert(!placeHolderToComments.contains(placeholderId)) +placeholderId + } else { +freshName("c") + } val comment = if (text.contains("\n") || text.contains("\r")) { text.split("(\r\n)|\r|\n").mkString("/**\n * ", "\n * ", "\n */") } else { http://git-wip-us.apache.org/repos/asf/spark/blob/95e25ed1/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 0e525b1..deb0a04 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -540,7 +540,9 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) ${ctx.registerComment( s"""Codegend pipeline for stage (id=$codegenStageId) - |${this.treeString.trim}""".stripMargin)} + |${this.treeString.trim}""".stripMargin, + "wsc_codegenPipeline")} + ${ctx.registerComment(s"codegenStageId=$codegenStageId", "wsc_codegenStageId", tr
spark git commit: [SPARK-23053][CORE][BRANCH-2.1] taskBinarySerialization and task partitions calculate in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status
Repository: spark Updated Branches: refs/heads/branch-2.1 1df8020e1 -> 24fe6eb0f [SPARK-23053][CORE][BRANCH-2.1] taskBinarySerialization and task partitions calculate in DagScheduler.submitMissingTasks should keep the same RDD checkpoint status ## What changes were proposed in this pull request? This PR backports [#20244](https://github.com/apache/spark/pull/20244) When we run concurrent jobs using the same rdd which is marked to do checkpoint. If one job has finished running the job, and start the process of RDD.doCheckpoint, while another job is submitted, then submitStage and submitMissingTasks will be called. In [submitMissingTasks](https://github.com/apache/spark/blob/branch-2.1/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L932), will serialize taskBinaryBytes and calculate task partitions which are both affected by the status of checkpoint, if the former is calculated before doCheckpoint finished, while the latter is calculated after doCheckpoint finished, when run task, rdd.compute will be called, for some rdds with particular partition type such as [UnionRDD](https://github.com/apache/spark/blob/branch-2.1/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala) who will do partition type cast, will get a ClassCastException because the part params is actually a CheckpointRDDPartition. This error occurs because rdd.doCheckpoint occurs in the same thread that called sc.runJob, while the task serialization occurs in the DAGSchedulers event loop. ## How was this patch tested? the exist tests. Author: huangtengfei Closes #20635 from ivoson/branch-2.1-23053. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24fe6eb0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24fe6eb0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24fe6eb0 Branch: refs/heads/branch-2.1 Commit: 24fe6eb0f6e2fe31d1c93fc65bc294ebb94e2dcf Parents: 1df8020 Author: huangtengfei Authored: Tue Feb 20 21:01:45 2018 -0600 Committer: Imran Rashid Committed: Tue Feb 20 21:01:45 2018 -0600 -- .../apache/spark/scheduler/DAGScheduler.scala | 27 +--- 1 file changed, 18 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/24fe6eb0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 01a95c0..9d46d69 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.Logging import org.apache.spark.network.util.JavaUtils import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{RDD, RDDCheckpointData} import org.apache.spark.rpc.RpcTimeout import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat @@ -982,15 +982,24 @@ class DAGScheduler( // might modify state of objects referenced in their closures. This is necessary in Hadoop // where the JobConf/Configuration object is not thread-safe. var taskBinary: Broadcast[Array[Byte]] = null +var partitions: Array[Partition] = null try { // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). // For ResultTask, serialize and broadcast (rdd, func). - val taskBinaryBytes: Array[Byte] = stage match { -case stage: ShuffleMapStage => - JavaUtils.bufferToArray( -closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) -case stage: ResultStage => - JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) + var taskBinaryBytes: Array[Byte] = null + // taskBinaryBytes and partitions are both effected by the checkpoint status. We need + // this synchronization in case another concurrent job is checkpointing this RDD, so we get a + // consistent view of both variables. + RDDCheckpointData.synchronized { +taskBinaryBytes = stage match { + case stage: ShuffleMapStage => +JavaUtils.bufferToArray( + closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) + case stage: ResultStage => +JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) +} + +partitions = stage.rdd.partitions } taskBinary = sc.broadcast(taskBinaryBytes) @@ -1013,7 +102
svn commit: r25174 - in /dev/spark/2.3.1-SNAPSHOT-2018_02_20_18_02-a1ee6f1-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Feb 21 02:17:26 2018 New Revision: 25174 Log: Apache Spark 2.3.1-SNAPSHOT-2018_02_20_18_02-a1ee6f1 docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23454][SS][DOCS] Added trigger information to the Structured Streaming programming guide
Repository: spark Updated Branches: refs/heads/master 6d398c05c -> 601d653bf [SPARK-23454][SS][DOCS] Added trigger information to the Structured Streaming programming guide ## What changes were proposed in this pull request? - Added clear information about triggers - Made the semantics guarantees of watermarks more clear for streaming aggregations and stream-stream joins. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tathagata Das Closes #20631 from tdas/SPARK-23454. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/601d653b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/601d653b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/601d653b Branch: refs/heads/master Commit: 601d653bff9160db8477f86d961e609fc2190237 Parents: 6d398c0 Author: Tathagata Das Authored: Tue Feb 20 18:16:10 2018 -0800 Committer: Tathagata Das Committed: Tue Feb 20 18:16:10 2018 -0800 -- docs/structured-streaming-programming-guide.md | 214 +++- 1 file changed, 207 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/601d653b/docs/structured-streaming-programming-guide.md -- diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 48d6d0b..9a83f15 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -904,7 +904,7 @@ windowedCounts <- count( -### Handling Late Data and Watermarking + Handling Late Data and Watermarking Now consider what happens if one of the events arrives late to the application. For example, say, a word generated at 12:04 (i.e. event time) could be received by the application at 12:11. The application should use the time 12:04 instead of 12:11 @@ -925,7 +925,9 @@ specifying the event time column and the threshold on how late the data is expec event time. For a specific window starting at time `T`, the engine will maintain state and allow late data to update the state until `(max event time seen by the engine - late threshold > T)`. In other words, late data within the threshold will be aggregated, -but data later than the threshold will be dropped. Let's understand this with an example. We can +but data later than the threshold will start getting dropped +(see [later]((#semantic-guarantees-of-aggregation-with-watermarking)) +in the section for the exact guarantees). Let's understand this with an example. We can easily define watermarking on the previous example using `withWatermark()` as shown below. @@ -1031,7 +1033,9 @@ then drops intermediate state of a window < watermark, and appends the final counts to the Result Table/sink. For example, the final counts of window `12:00 - 12:10` is appended to the Result Table only after the watermark is updated to `12:11`. -**Conditions for watermarking to clean aggregation state** +# Conditions for watermarking to clean aggregation state +{:.no_toc} + It is important to note that the following conditions must be satisfied for the watermarking to clean the state in aggregation queries *(as of Spark 2.1.1, subject to change in the future)*. @@ -1051,6 +1055,16 @@ from the aggregation column. For example, `df.groupBy("time").count().withWatermark("time", "1 min")` is invalid in Append output mode. +# Semantic Guarantees of Aggregation with Watermarking +{:.no_toc} + +- A watermark delay (set with `withWatermark`) of "2 hours" guarantees that the engine will never +drop any data that is less than 2 hours delayed. In other words, any data less than 2 hours behind +(in terms of event-time) the latest data processed till then is guaranteed to be aggregated. + +- However, the guarantee is strict only in one direction. Data delayed by more than 2 hours is +not guaranteed to be dropped; it may or may not get aggregated. More delayed is the data, less +likely is the engine going to process it. ### Join Operations Structured Streaming supports joining a streaming Dataset/DataFrame with a static Dataset/DataFrame @@ -1062,7 +1076,7 @@ Dataset/DataFrame will be the exactly the same as if it was with a static Datase containing the same data in the stream. - Stream-static joins + Stream-static Joins Since the introduction in Spark 2.0, Structured Streaming has supported joins (inner join and some type of outer joins) between a streaming and a static DataFrame/Dataset. Here i
spark git commit: [SPARK-23454][SS][DOCS] Added trigger information to the Structured Streaming programming guide
Repository: spark Updated Branches: refs/heads/branch-2.3 1d78f03ae -> 3e7269eb9 [SPARK-23454][SS][DOCS] Added trigger information to the Structured Streaming programming guide ## What changes were proposed in this pull request? - Added clear information about triggers - Made the semantics guarantees of watermarks more clear for streaming aggregations and stream-stream joins. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tathagata Das Closes #20631 from tdas/SPARK-23454. (cherry picked from commit 601d653bff9160db8477f86d961e609fc2190237) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e7269eb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e7269eb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e7269eb Branch: refs/heads/branch-2.3 Commit: 3e7269eb904b591883300d7433e5c99be0b3b5b3 Parents: 1d78f03 Author: Tathagata Das Authored: Tue Feb 20 18:16:10 2018 -0800 Committer: Tathagata Das Committed: Tue Feb 20 18:16:23 2018 -0800 -- docs/structured-streaming-programming-guide.md | 214 +++- 1 file changed, 207 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3e7269eb/docs/structured-streaming-programming-guide.md -- diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 48d6d0b..9a83f15 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -904,7 +904,7 @@ windowedCounts <- count( -### Handling Late Data and Watermarking + Handling Late Data and Watermarking Now consider what happens if one of the events arrives late to the application. For example, say, a word generated at 12:04 (i.e. event time) could be received by the application at 12:11. The application should use the time 12:04 instead of 12:11 @@ -925,7 +925,9 @@ specifying the event time column and the threshold on how late the data is expec event time. For a specific window starting at time `T`, the engine will maintain state and allow late data to update the state until `(max event time seen by the engine - late threshold > T)`. In other words, late data within the threshold will be aggregated, -but data later than the threshold will be dropped. Let's understand this with an example. We can +but data later than the threshold will start getting dropped +(see [later]((#semantic-guarantees-of-aggregation-with-watermarking)) +in the section for the exact guarantees). Let's understand this with an example. We can easily define watermarking on the previous example using `withWatermark()` as shown below. @@ -1031,7 +1033,9 @@ then drops intermediate state of a window < watermark, and appends the final counts to the Result Table/sink. For example, the final counts of window `12:00 - 12:10` is appended to the Result Table only after the watermark is updated to `12:11`. -**Conditions for watermarking to clean aggregation state** +# Conditions for watermarking to clean aggregation state +{:.no_toc} + It is important to note that the following conditions must be satisfied for the watermarking to clean the state in aggregation queries *(as of Spark 2.1.1, subject to change in the future)*. @@ -1051,6 +1055,16 @@ from the aggregation column. For example, `df.groupBy("time").count().withWatermark("time", "1 min")` is invalid in Append output mode. +# Semantic Guarantees of Aggregation with Watermarking +{:.no_toc} + +- A watermark delay (set with `withWatermark`) of "2 hours" guarantees that the engine will never +drop any data that is less than 2 hours delayed. In other words, any data less than 2 hours behind +(in terms of event-time) the latest data processed till then is guaranteed to be aggregated. + +- However, the guarantee is strict only in one direction. Data delayed by more than 2 hours is +not guaranteed to be dropped; it may or may not get aggregated. More delayed is the data, less +likely is the engine going to process it. ### Join Operations Structured Streaming supports joining a streaming Dataset/DataFrame with a static Dataset/DataFrame @@ -1062,7 +1076,7 @@ Dataset/DataFrame will be the exactly the same as if it was with a static Datase containing the same data in the stream. - Stream-static joins + Stream-static Joins Since the introduction in Spark 2.0, Structured Streaming has supported j
spark git commit: [SPARK-23468][CORE] Stringify auth secret before storing it in credentials.
Repository: spark Updated Branches: refs/heads/branch-2.3 a1ee6f1fc -> 1d78f03ae [SPARK-23468][CORE] Stringify auth secret before storing it in credentials. The secret is used as a string in many parts of the code, so it has to be turned into a hex string to avoid issues such as the random byte sequence not containing a valid UTF8 sequence. Author: Marcelo Vanzin Closes #20643 from vanzin/SPARK-23468. (cherry picked from commit 6d398c05cbad69aa9093429e04ae44c73b81cd5a) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1d78f03a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1d78f03a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1d78f03a Branch: refs/heads/branch-2.3 Commit: 1d78f03ae3037f1ddbe6533a6733b7805a6705bf Parents: a1ee6f1 Author: Marcelo Vanzin Authored: Tue Feb 20 18:06:21 2018 -0800 Committer: Marcelo Vanzin Committed: Tue Feb 20 18:06:31 2018 -0800 -- core/src/main/scala/org/apache/spark/SecurityManager.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1d78f03a/core/src/main/scala/org/apache/spark/SecurityManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 4c1dbe3..5b15a1c 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -541,7 +541,8 @@ private[spark] class SecurityManager( rnd.nextBytes(secretBytes) val creds = new Credentials() -creds.addSecretKey(SECRET_LOOKUP_KEY, secretBytes) +val secretStr = HashCodes.fromBytes(secretBytes).toString() +creds.addSecretKey(SECRET_LOOKUP_KEY, secretStr.getBytes(UTF_8)) UserGroupInformation.getCurrentUser().addCredentials(creds) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23468][CORE] Stringify auth secret before storing it in credentials.
Repository: spark Updated Branches: refs/heads/master 2ba77ed9e -> 6d398c05c [SPARK-23468][CORE] Stringify auth secret before storing it in credentials. The secret is used as a string in many parts of the code, so it has to be turned into a hex string to avoid issues such as the random byte sequence not containing a valid UTF8 sequence. Author: Marcelo Vanzin Closes #20643 from vanzin/SPARK-23468. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6d398c05 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6d398c05 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6d398c05 Branch: refs/heads/master Commit: 6d398c05cbad69aa9093429e04ae44c73b81cd5a Parents: 2ba77ed Author: Marcelo Vanzin Authored: Tue Feb 20 18:06:21 2018 -0800 Committer: Marcelo Vanzin Committed: Tue Feb 20 18:06:21 2018 -0800 -- core/src/main/scala/org/apache/spark/SecurityManager.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6d398c05/core/src/main/scala/org/apache/spark/SecurityManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 4c1dbe3..5b15a1c 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -541,7 +541,8 @@ private[spark] class SecurityManager( rnd.nextBytes(secretBytes) val creds = new Credentials() -creds.addSecretKey(SECRET_LOOKUP_KEY, secretBytes) +val secretStr = HashCodes.fromBytes(secretBytes).toString() +creds.addSecretKey(SECRET_LOOKUP_KEY, secretStr.getBytes(UTF_8)) UserGroupInformation.getCurrentUser().addCredentials(creds) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23470][UI] Use first attempt of last stage to define job description.
Repository: spark Updated Branches: refs/heads/branch-2.3 c7a0dea46 -> a1ee6f1fc [SPARK-23470][UI] Use first attempt of last stage to define job description. This is much faster than finding out what the last attempt is, and the data should be the same. There's room for improvement in this page (like only loading data for the jobs being shown, instead of loading all available jobs and sorting them), but this should bring performance on par with the 2.2 version. Author: Marcelo Vanzin Closes #20644 from vanzin/SPARK-23470. (cherry picked from commit 2ba77ed9e51922303e3c3533e368b95788bd7de5) Signed-off-by: Sameer Agarwal Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a1ee6f1f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a1ee6f1f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a1ee6f1f Branch: refs/heads/branch-2.3 Commit: a1ee6f1fc543120763f1b373bb31bc6d84004318 Parents: c7a0dea Author: Marcelo Vanzin Authored: Tue Feb 20 17:54:06 2018 -0800 Committer: Sameer Agarwal Committed: Tue Feb 20 17:54:17 2018 -0800 -- core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a1ee6f1f/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index a4710f6..08a927a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -1040,7 +1040,7 @@ private[ui] object ApiHelper { } def lastStageNameAndDescription(store: AppStatusStore, job: JobData): (String, String) = { -val stage = store.asOption(store.lastStageAttempt(job.stageIds.max)) +val stage = store.asOption(store.stageAttempt(job.stageIds.max, 0)) (stage.map(_.name).getOrElse(""), stage.flatMap(_.description).getOrElse(job.name)) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23470][UI] Use first attempt of last stage to define job description.
Repository: spark Updated Branches: refs/heads/master 3e48f3b9e -> 2ba77ed9e [SPARK-23470][UI] Use first attempt of last stage to define job description. This is much faster than finding out what the last attempt is, and the data should be the same. There's room for improvement in this page (like only loading data for the jobs being shown, instead of loading all available jobs and sorting them), but this should bring performance on par with the 2.2 version. Author: Marcelo Vanzin Closes #20644 from vanzin/SPARK-23470. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ba77ed9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ba77ed9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ba77ed9 Branch: refs/heads/master Commit: 2ba77ed9e51922303e3c3533e368b95788bd7de5 Parents: 3e48f3b Author: Marcelo Vanzin Authored: Tue Feb 20 17:54:06 2018 -0800 Committer: Sameer Agarwal Committed: Tue Feb 20 17:54:06 2018 -0800 -- core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2ba77ed9/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala -- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index a9265d4..ac83de1 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -1048,7 +1048,7 @@ private[ui] object ApiHelper { } def lastStageNameAndDescription(store: AppStatusStore, job: JobData): (String, String) = { -val stage = store.asOption(store.lastStageAttempt(job.stageIds.max)) +val stage = store.asOption(store.stageAttempt(job.stageIds.max, 0)) (stage.map(_.name).getOrElse(""), stage.flatMap(_.description).getOrElse(job.name)) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23434][SQL] Spark should not warn `metadata directory` for a HDFS file path
Repository: spark Updated Branches: refs/heads/master 83c008762 -> 3e48f3b9e [SPARK-23434][SQL] Spark should not warn `metadata directory` for a HDFS file path ## What changes were proposed in this pull request? In a kerberized cluster, when Spark reads a file path (e.g. `people.json`), it warns with a wrong warning message during looking up `people.json/_spark_metadata`. The root cause of this situation is the difference between `LocalFileSystem` and `DistributedFileSystem`. `LocalFileSystem.exists()` returns `false`, but `DistributedFileSystem.exists` raises `org.apache.hadoop.security.AccessControlException`. ```scala scala> spark.version res0: String = 2.4.0-SNAPSHOT scala> spark.read.json("file:///usr/hdp/current/spark-client/examples/src/main/resources/people.json").show ++---+ | age| name| ++---+ |null|Michael| | 30| Andy| | 19| Justin| ++---+ scala> spark.read.json("hdfs:///tmp/people.json") 18/02/15 05:00:48 WARN streaming.FileStreamSink: Error while looking for metadata directory. 18/02/15 05:00:48 WARN streaming.FileStreamSink: Error while looking for metadata directory. ``` After this PR, ```scala scala> spark.read.json("hdfs:///tmp/people.json").show ++---+ | age| name| ++---+ |null|Michael| | 30| Andy| | 19| Justin| ++---+ ``` ## How was this patch tested? Manual. Author: Dongjoon Hyun Closes #20616 from dongjoon-hyun/SPARK-23434. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e48f3b9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e48f3b9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e48f3b9 Branch: refs/heads/master Commit: 3e48f3b9ee7645e4218ad3ff7559e578d4bd9667 Parents: 83c0087 Author: Dongjoon Hyun Authored: Tue Feb 20 16:02:44 2018 -0800 Committer: Shixiong Zhu Committed: Tue Feb 20 16:02:44 2018 -0800 -- .../spark/sql/execution/streaming/FileStreamSink.scala | 8 +--- 1 file changed, 5 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3e48f3b9/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index 2715fa9..87a17ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -42,9 +42,11 @@ object FileStreamSink extends Logging { try { val hdfsPath = new Path(singlePath) val fs = hdfsPath.getFileSystem(hadoopConf) - val metadataPath = new Path(hdfsPath, metadataDir) - val res = fs.exists(metadataPath) - res + if (fs.isDirectory(hdfsPath)) { +fs.exists(new Path(hdfsPath, metadataDir)) + } else { +false + } } catch { case NonFatal(e) => logWarning(s"Error while looking for metadata directory.") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r25169 - in /dev/spark/2.4.0-SNAPSHOT-2018_02_20_12_01-83c0087-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Feb 20 20:15:42 2018 New Revision: 25169 Log: Apache Spark 2.4.0-SNAPSHOT-2018_02_20_12_01-83c0087 docs [This commit notification would consist of 1444 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23456][SPARK-21783] Turn on `native` ORC impl and PPD by default
Repository: spark Updated Branches: refs/heads/master 189f56f3d -> 83c008762 [SPARK-23456][SPARK-21783] Turn on `native` ORC impl and PPD by default ## What changes were proposed in this pull request? Apache Spark 2.3 introduced `native` ORC supports with vectorization and many fixes. However, it's shipped as a not-default option. This PR enables `native` ORC implementation and predicate-pushdown by default for Apache Spark 2.4. We will improve and stabilize ORC data source before Apache Spark 2.4. And, eventually, Apache Spark will drop old Hive-based ORC code. ## How was this patch tested? Pass the Jenkins with existing tests. Author: Dongjoon Hyun Closes #20634 from dongjoon-hyun/SPARK-23456. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/83c00876 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/83c00876 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/83c00876 Branch: refs/heads/master Commit: 83c008762af444eef73d835eb6f506ecf5aebc17 Parents: 189f56f Author: Dongjoon Hyun Authored: Tue Feb 20 09:14:56 2018 -0800 Committer: gatorsmile Committed: Tue Feb 20 09:14:56 2018 -0800 -- docs/sql-programming-guide.md | 6 +- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 6 +++--- 2 files changed, 8 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/83c00876/docs/sql-programming-guide.md -- diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 91e4367..c37c338 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1018,7 +1018,7 @@ the vectorized reader is used when `spark.sql.hive.convertMetastoreOrc` is also spark.sql.orc.impl hive -The name of ORC implementation. It can be one of native and hive. native means the native ORC support that is built on Apache ORC 1.4.1. `hive` means the ORC library in Hive 1.2.1. +The name of ORC implementation. It can be one of native and hive. native means the native ORC support that is built on Apache ORC 1.4. `hive` means the ORC library in Hive 1.2.1. spark.sql.orc.enableVectorizedReader @@ -1797,6 +1797,10 @@ working with timestamps in `pandas_udf`s to get the best performance, see # Migration Guide +## Upgrading From Spark SQL 2.3 to 2.4 + + - Since Spark 2.4, Spark maximizes the usage of a vectorized ORC reader for ORC files by default. To do that, `spark.sql.orc.impl` and `spark.sql.orc.filterPushdown` change their default values to `native` and `true` respectively. + ## Upgrading From Spark SQL 2.2 to 2.3 - Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named `_corrupt_record` by default). For example, `spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()` and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`. Instead, you can cache or save the parsed results and then send the same query. For example, `val df = spark.read.schema(schema).json(file).cache()` and then `df.filter($"_corrupt_record".isNotNull).count()`. http://git-wip-us.apache.org/repos/asf/spark/blob/83c00876/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e75e1d6..ce3f946 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -399,11 +399,11 @@ object SQLConf { val ORC_IMPLEMENTATION = buildConf("spark.sql.orc.impl") .doc("When native, use the native version of ORC support instead of the ORC library in Hive " + - "1.2.1. It is 'hive' by default.") + "1.2.1. It is 'hive' by default prior to Spark 2.4.") .internal() .stringConf .checkValues(Set("hive", "native")) -.createWithDefault("hive") +.createWithDefault("native") val ORC_VECTORIZED_READER_ENABLED = buildConf("spark.sql.orc.enableVectorizedReader") .doc("Enables vectorized orc decoding.") @@ -426,7 +426,7 @@ object SQLConf { val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown") .doc("When true, enable filter pushdown for ORC files.") .booleanConf -.createWithDefault(false) +.createWithDefault(true) val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath") .doc("When true, check all the partition path
svn commit: r25166 - in /dev/spark/2.4.0-SNAPSHOT-2018_02_20_08_01-189f56f-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Feb 20 16:18:11 2018 New Revision: 25166 Log: Apache Spark 2.4.0-SNAPSHOT-2018_02_20_08_01-189f56f docs [This commit notification would consist of 1444 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23383][BUILD][MINOR] Make a distribution should exit with usage while detecting wrong options
Repository: spark Updated Branches: refs/heads/master 862fa697d -> 189f56f3d [SPARK-23383][BUILD][MINOR] Make a distribution should exit with usage while detecting wrong options ## What changes were proposed in this pull request? ```shell ./dev/make-distribution.sh --name ne-1.0.0-SNAPSHOT xyz --tgz -Phadoop-2.7 +++ dirname ./dev/make-distribution.sh ++ cd ./dev/.. ++ pwd + SPARK_HOME=/Users/Kent/Documents/spark + DISTDIR=/Users/Kent/Documents/spark/dist + MAKE_TGZ=false + MAKE_PIP=false + MAKE_R=false + NAME=none + MVN=/Users/Kent/Documents/spark/build/mvn + (( 5 )) + case $1 in + NAME=ne-1.0.0-SNAPSHOT + shift + shift + (( 3 )) + case $1 in + break + '[' -z /Users/Kent/.jenv/candidates/java/current ']' + '[' -z /Users/Kent/.jenv/candidates/java/current ']' ++ command -v git + '[' /usr/local/bin/git ']' ++ git rev-parse --short HEAD + GITREV=98ea6a7 + '[' '!' -z 98ea6a7 ']' + GITREVSTRING=' (git revision 98ea6a7)' + unset GITREV ++ command -v /Users/Kent/Documents/spark/build/mvn + '[' '!' /Users/Kent/Documents/spark/build/mvn ']' ++ /Users/Kent/Documents/spark/build/mvn help:evaluate -Dexpression=project.version xyz --tgz -Phadoop-2.7 ++ grep -v INFO ++ tail -n 1 + VERSION=' -X,--debug Produce execution debug output' ``` It is better to declare the mistakes and exit with usage than `break` ## How was this patch tested? manually cc srowen Author: Kent Yao Closes #20571 from yaooqinn/SPARK-23383. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/189f56f3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/189f56f3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/189f56f3 Branch: refs/heads/master Commit: 189f56f3dcdad4d997248c01aa5490617f018bd0 Parents: 862fa69 Author: Kent Yao Authored: Tue Feb 20 07:51:30 2018 -0600 Committer: Sean Owen Committed: Tue Feb 20 07:51:30 2018 -0600 -- dev/make-distribution.sh | 10 +- 1 file changed, 9 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/189f56f3/dev/make-distribution.sh -- diff --git a/dev/make-distribution.sh b/dev/make-distribution.sh index 8b02446..84233c6 100755 --- a/dev/make-distribution.sh +++ b/dev/make-distribution.sh @@ -72,9 +72,17 @@ while (( "$#" )); do --help) exit_with_usage ;; -*) +--*) + echo "Error: $1 is not supported" + exit_with_usage + ;; +-*) break ;; +*) + echo "Error: $1 is not supported" + exit_with_usage + ;; esac shift done - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r25160 - in /dev/spark/2.4.0-SNAPSHOT-2018_02_20_04_01-862fa69-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Feb 20 12:18:19 2018 New Revision: 25160 Log: Apache Spark 2.4.0-SNAPSHOT-2018_02_20_04_01-862fa69 docs [This commit notification would consist of 1444 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23240][PYTHON] Better error message when extraneous data in pyspark.daemon's stdout
Repository: spark Updated Branches: refs/heads/master aadf9535b -> 862fa697d [SPARK-23240][PYTHON] Better error message when extraneous data in pyspark.daemon's stdout ## What changes were proposed in this pull request? Print more helpful message when daemon module's stdout is empty or contains a bad port number. ## How was this patch tested? Manually recreated the environmental issues that caused the mysterious exceptions at one site. Tested that the expected messages are logged. Also, ran all scala unit tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Bruce Robbins Closes #20424 from bersprockets/SPARK-23240_prop2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/862fa697 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/862fa697 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/862fa697 Branch: refs/heads/master Commit: 862fa697d829cdddf0f25e5613c91b040f9d9652 Parents: aadf953 Author: Bruce Robbins Authored: Tue Feb 20 20:26:26 2018 +0900 Committer: hyukjinkwon Committed: Tue Feb 20 20:26:26 2018 +0900 -- .../spark/api/python/PythonWorkerFactory.scala | 29 ++-- 1 file changed, 26 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/862fa697/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 30976ac..2340580 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -17,7 +17,7 @@ package org.apache.spark.api.python -import java.io.{DataInputStream, DataOutputStream, InputStream, OutputStreamWriter} +import java.io.{DataInputStream, DataOutputStream, EOFException, InputStream, OutputStreamWriter} import java.net.{InetAddress, ServerSocket, Socket, SocketException} import java.nio.charset.StandardCharsets import java.util.Arrays @@ -182,7 +182,8 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String try { // Create and start the daemon -val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", daemonModule)) +val command = Arrays.asList(pythonExec, "-m", daemonModule) +val pb = new ProcessBuilder(command) val workerEnv = pb.environment() workerEnv.putAll(envVars.asJava) workerEnv.put("PYTHONPATH", pythonPath) @@ -191,7 +192,29 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String daemon = pb.start() val in = new DataInputStream(daemon.getInputStream) -daemonPort = in.readInt() +try { + daemonPort = in.readInt() +} catch { + case _: EOFException => +throw new SparkException(s"No port number in $daemonModule's stdout") +} + +// test that the returned port number is within a valid range. +// note: this does not cover the case where the port number +// is arbitrary data but is also coincidentally within range +if (daemonPort < 1 || daemonPort > 0x) { + val exceptionMessage = f""" +|Bad data in $daemonModule's standard output. Invalid port number: +| $daemonPort (0x$daemonPort%08x) +|Python command to execute the daemon was: +| ${command.asScala.mkString(" ")} +|Check that you don't have any unexpected modules or libraries in +|your PYTHONPATH: +| $pythonPath +|Also, check if you have a sitecustomize.py module in your python path, +|or in your python installation, that is printing to standard output""" + throw new SparkException(exceptionMessage.stripMargin) +} // Redirect daemon stdout and stderr redirectStreamsToStderr(in, daemon.getErrorStream) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r25156 - in /dev/spark/2.4.0-SNAPSHOT-2018_02_20_00_01-651b027-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Feb 20 08:17:45 2018 New Revision: 25156 Log: Apache Spark 2.4.0-SNAPSHOT-2018_02_20_00_01-651b027 docs [This commit notification would consist of 1444 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23203][SQL] DataSourceV2: Use immutable logical plans.
Repository: spark Updated Branches: refs/heads/master 651b0277f -> aadf9535b [SPARK-23203][SQL] DataSourceV2: Use immutable logical plans. ## What changes were proposed in this pull request? SPARK-23203: DataSourceV2 should use immutable catalyst trees instead of wrapping a mutable DataSourceV2Reader. This commit updates DataSourceV2Relation and consolidates much of the DataSourceV2 API requirements for the read path in it. Instead of wrapping a reader that changes, the relation lazily produces a reader from its configuration. This commit also updates the predicate and projection push-down. Instead of the implementation from SPARK-22197, this reuses the rule matching from the Hive and DataSource read paths (using `PhysicalOperation`) and copies most of the implementation of `SparkPlanner.pruneFilterProject`, with updates for DataSourceV2. By reusing the implementation from other read paths, this should have fewer regressions from other read paths and is less code to maintain. The new push-down rules also supports the following edge cases: * The output of DataSourceV2Relation should be what is returned by the reader, in case the reader can only partially satisfy the requested schema projection * The requested projection passed to the DataSourceV2Reader should include filter columns * The push-down rule may be run more than once if filters are not pushed through projections ## How was this patch tested? Existing push-down and read tests. Author: Ryan Blue Closes #20387 from rdblue/SPARK-22386-push-down-immutable-trees. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aadf9535 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aadf9535 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aadf9535 Branch: refs/heads/master Commit: aadf9535b4a11b42fd9d72f636576d2da0766199 Parents: 651b027 Author: Ryan Blue Authored: Tue Feb 20 16:04:22 2018 +0800 Committer: Wenchen Fan Committed: Tue Feb 20 16:04:22 2018 +0800 -- .../kafka010/KafkaContinuousSourceSuite.scala | 19 +- .../sql/kafka010/KafkaContinuousTest.scala | 4 +- .../kafka010/KafkaMicroBatchSourceSuite.scala | 4 +- .../org/apache/spark/sql/DataFrameReader.scala | 41 +--- .../datasources/v2/DataSourceV2Relation.scala | 212 +-- .../datasources/v2/DataSourceV2Strategy.scala | 7 +- .../v2/PushDownOperatorsToDataSource.scala | 159 -- .../continuous/ContinuousExecution.scala| 2 +- .../sql/sources/v2/DataSourceV2Suite.scala | 2 +- .../apache/spark/sql/streaming/StreamTest.scala | 6 +- 10 files changed, 269 insertions(+), 187 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/aadf9535/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala -- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index a7083fa..f679e9b 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -17,20 +17,9 @@ package org.apache.spark.sql.kafka010 -import java.util.Properties -import java.util.concurrent.atomic.AtomicInteger - -import org.scalatest.time.SpanSugar._ -import scala.collection.mutable -import scala.util.Random - -import org.apache.spark.SparkContext -import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row} -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.execution.streaming.StreamExecution -import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution -import org.apache.spark.sql.streaming.{StreamTest, Trigger} -import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation +import org.apache.spark.sql.streaming.Trigger // Run tests in KafkaSourceSuiteBase in continuous execution mode. class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuousTest @@ -71,7 +60,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { eventually(timeout(streamingTimeout)) { assert( query.lastExecution.logical.collectFirst { - case DataSourceV2Relation(_, r: KafkaContinuousReader) => r + case StreamingDataSourceV2Relation(_, r: KafkaContinuousReader) => r }.exists { r =>