svn commit: r29758 - in /dev/spark/2.5.0-SNAPSHOT-2018_09_27_20_02-3b7395f-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Sep 28 03:16:57 2018 New Revision: 29758 Log: Apache Spark 2.5.0-SNAPSHOT-2018_09_27_20_02-3b7395f docs [This commit notification would consist of 1485 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25459][SQL] Add viewOriginalText back to CatalogTable
Repository: spark Updated Branches: refs/heads/master 5fd22d053 -> 3b7395fe0 [SPARK-25459][SQL] Add viewOriginalText back to CatalogTable ## What changes were proposed in this pull request? The `show create table` will show a lot of generated attributes for views that created by older Spark version. This PR will basically revert https://issues.apache.org/jira/browse/SPARK-19272 back, so when you `DESC [FORMATTED|EXTENDED] view` will show the original view DDL text. ## How was this patch tested? Unit test. Closes #22458 from zheyuan28/testbranch. Lead-authored-by: Chris Zhao Co-authored-by: Christopher Zhao Co-authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3b7395fe Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3b7395fe Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3b7395fe Branch: refs/heads/master Commit: 3b7395fe025a4c9a591835e53ac6ca05be6868f1 Parents: 5fd22d0 Author: Chris Zhao Authored: Thu Sep 27 17:55:08 2018 -0700 Committer: Dongjoon Hyun Committed: Thu Sep 27 17:55:08 2018 -0700 -- .../spark/sql/catalyst/catalog/interface.scala | 4 +++- .../spark/sql/execution/command/views.scala | 2 ++ .../sql-tests/results/describe.sql.out | 2 ++ .../spark/sql/hive/client/HiveClientImpl.scala | 9 +--- .../spark/sql/hive/execution/HiveDDLSuite.scala | 22 5 files changed, 35 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3b7395fe/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 30ded13..817abeb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -244,7 +244,8 @@ case class CatalogTable( unsupportedFeatures: Seq[String] = Seq.empty, tracksPartitionsInCatalog: Boolean = false, schemaPreservesCase: Boolean = true, -ignoredProperties: Map[String, String] = Map.empty) { +ignoredProperties: Map[String, String] = Map.empty, +viewOriginalText: Option[String] = None) { import CatalogTable._ @@ -331,6 +332,7 @@ case class CatalogTable( comment.foreach(map.put("Comment", _)) if (tableType == CatalogTableType.VIEW) { viewText.foreach(map.put("View Text", _)) + viewOriginalText.foreach(map.put("View Original Text", _)) viewDefaultDatabase.foreach(map.put("View Default Database", _)) if (viewQueryColumnNames.nonEmpty) { map.put("View Query Output Columns", viewQueryColumnNames.mkString("[", ", ", "]")) http://git-wip-us.apache.org/repos/asf/spark/blob/3b7395fe/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 5172f32..cd34dfa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -242,6 +242,7 @@ case class CreateViewCommand( storage = CatalogStorageFormat.empty, schema = aliasPlan(session, analyzedPlan).schema, properties = newProperties, + viewOriginalText = originalText, viewText = originalText, comment = comment ) @@ -299,6 +300,7 @@ case class AlterViewAsCommand( val updatedViewMeta = viewMeta.copy( schema = analyzedPlan.schema, properties = newProperties, + viewOriginalText = Some(originalText), viewText = Some(originalText)) session.sessionState.catalog.alterTable(updatedViewMeta) http://git-wip-us.apache.org/repos/asf/spark/blob/3b7395fe/sql/core/src/test/resources/sql-tests/results/describe.sql.out -- diff --git a/sql/core/src/test/resources/sql-tests/results/describe.sql.out b/sql/core/src/test/resources/sql-tests/results/describe.sql.out index 79390cb..9c4b70d 100644 --- a/sql/core/src/test/resources/sql-tests/results/describe.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/describe.sql.out @@ -474,6 +474,7 @@ Last Access [not included in comparison] Created By [not included in comparison] Type VIEW View Text SELECT *
svn commit: r29751 - in /dev/spark/2.3.3-SNAPSHOT-2018_09_27_14_03-f13565b-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Sep 27 21:19:30 2018 New Revision: 29751 Log: Apache Spark 2.3.3-SNAPSHOT-2018_09_27_14_03-f13565b docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r29750 - in /dev/spark/2.4.1-SNAPSHOT-2018_09_27_14_03-a43a082-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Sep 27 21:18:34 2018 New Revision: 29750 Log: Apache Spark 2.4.1-SNAPSHOT-2018_09_27_14_03-a43a082 docs [This commit notification would consist of 1472 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r29749 - in /dev/spark/2.5.0-SNAPSHOT-2018_09_27_12_02-5fd22d0-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Sep 27 19:17:11 2018 New Revision: 29749 Log: Apache Spark 2.5.0-SNAPSHOT-2018_09_27_12_02-5fd22d0 docs [This commit notification would consist of 1485 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25533][CORE][WEBUI] AppSummary should hold the information about succeeded Jobs and completed stages only
Repository: spark Updated Branches: refs/heads/branch-2.3 f40e4c71c -> f13565b6e [SPARK-25533][CORE][WEBUI] AppSummary should hold the information about succeeded Jobs and completed stages only Currently, In the spark UI, when there are failed jobs or failed stages, display message for the completed jobs and completed stages are not consistent with the previous versions of spark. Reason is because, AppSummary holds the information about all the jobs and stages. But, In the below code, it checks against the completedJobs and completedStages. So, AppSummary should hold only successful jobs and stages. https://github.com/apache/spark/blob/66d29870c09e6050dd846336e596faaa8b0d14ad/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala#L306 https://github.com/apache/spark/blob/66d29870c09e6050dd846336e596faaa8b0d14ad/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala#L119 So, we should keep only completed jobs and stage information in the AppSummary, to make it consistent with Spark2.2 Test steps: bin/spark-shell ``` sc.parallelize(1 to 5, 5).collect() sc.parallelize(1 to 5, 2).map{ x => throw new RuntimeException("Fail")}.collect() ``` **Before fix:** ![screenshot from 2018-09-26 03-24-53](https://user-images.githubusercontent.com/23054875/46045669-f60bcd80-c13b-11e8-9aa6-a2e5a2038dba.png) ![screenshot from 2018-09-26 03-25-08](https://user-images.githubusercontent.com/23054875/46045699-0ae86100-c13c-11e8-94e5-ad35944c7615.png) **After fix:** ![screenshot from 2018-09-26 03-16-14](https://user-images.githubusercontent.com/23054875/46045636-d83e6880-c13b-11e8-98df-f49d15c18958.png) ![screenshot from 2018-09-26 03-16-28](https://user-images.githubusercontent.com/23054875/46045645-e1c7d080-c13b-11e8-8c9c-d32e1f663356.png) Closes #22549 from shahidki31/SPARK-25533. Authored-by: Shahid Signed-off-by: Marcelo Vanzin (cherry picked from commit 5ee21661834e837d414bc20591982a092c0aece3) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f13565b6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f13565b6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f13565b6 Branch: refs/heads/branch-2.3 Commit: f13565b6ec2de2e3304b42de3a2e61da6a8ff3b0 Parents: f40e4c7 Author: Shahid Authored: Wed Sep 26 10:47:49 2018 -0700 Committer: Marcelo Vanzin Committed: Thu Sep 27 10:34:21 2018 -0700 -- .../org/apache/spark/status/AppStatusListener.scala | 14 -- 1 file changed, 8 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f13565b6/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala -- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 5bea7df..d57c977 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -334,10 +334,11 @@ private[spark] class AppStatusListener( job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None update(job, now, last = true) + if (job.status == JobExecutionStatus.SUCCEEDED) { +appSummary = new AppSummary(appSummary.numCompletedJobs + 1, appSummary.numCompletedStages) +kvstore.write(appSummary) + } } - -appSummary = new AppSummary(appSummary.numCompletedJobs + 1, appSummary.numCompletedStages) -kvstore.write(appSummary) } override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { @@ -592,10 +593,11 @@ private[spark] class AppStatusListener( if (removeStage) { liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptNumber)) } + if (stage.status == v1.StageStatus.COMPLETE) { +appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1) +kvstore.write(appSummary) + } } - -appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1) -kvstore.write(appSummary) } override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25533][CORE][WEBUI] AppSummary should hold the information about succeeded Jobs and completed stages only
Repository: spark Updated Branches: refs/heads/branch-2.4 0256f8a09 -> a43a082e0 [SPARK-25533][CORE][WEBUI] AppSummary should hold the information about succeeded Jobs and completed stages only Currently, In the spark UI, when there are failed jobs or failed stages, display message for the completed jobs and completed stages are not consistent with the previous versions of spark. Reason is because, AppSummary holds the information about all the jobs and stages. But, In the below code, it checks against the completedJobs and completedStages. So, AppSummary should hold only successful jobs and stages. https://github.com/apache/spark/blob/66d29870c09e6050dd846336e596faaa8b0d14ad/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala#L306 https://github.com/apache/spark/blob/66d29870c09e6050dd846336e596faaa8b0d14ad/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala#L119 So, we should keep only completed jobs and stage information in the AppSummary, to make it consistent with Spark2.2 Test steps: bin/spark-shell ``` sc.parallelize(1 to 5, 5).collect() sc.parallelize(1 to 5, 2).map{ x => throw new RuntimeException("Fail")}.collect() ``` **Before fix:** ![screenshot from 2018-09-26 03-24-53](https://user-images.githubusercontent.com/23054875/46045669-f60bcd80-c13b-11e8-9aa6-a2e5a2038dba.png) ![screenshot from 2018-09-26 03-25-08](https://user-images.githubusercontent.com/23054875/46045699-0ae86100-c13c-11e8-94e5-ad35944c7615.png) **After fix:** ![screenshot from 2018-09-26 03-16-14](https://user-images.githubusercontent.com/23054875/46045636-d83e6880-c13b-11e8-98df-f49d15c18958.png) ![screenshot from 2018-09-26 03-16-28](https://user-images.githubusercontent.com/23054875/46045645-e1c7d080-c13b-11e8-8c9c-d32e1f663356.png) Closes #22549 from shahidki31/SPARK-25533. Authored-by: Shahid Signed-off-by: Marcelo Vanzin (cherry picked from commit 5ee21661834e837d414bc20591982a092c0aece3) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a43a082e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a43a082e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a43a082e Branch: refs/heads/branch-2.4 Commit: a43a082e0a9fe123e5705f4b0f73483c10c1ad9e Parents: 0256f8a Author: Shahid Authored: Wed Sep 26 10:47:49 2018 -0700 Committer: Marcelo Vanzin Committed: Thu Sep 27 10:24:14 2018 -0700 -- .../org/apache/spark/status/AppStatusListener.scala | 14 -- 1 file changed, 8 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a43a082e/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala -- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index b9f604b..513c929 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -374,10 +374,11 @@ private[spark] class AppStatusListener( job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None update(job, now, last = true) + if (job.status == JobExecutionStatus.SUCCEEDED) { +appSummary = new AppSummary(appSummary.numCompletedJobs + 1, appSummary.numCompletedStages) +kvstore.write(appSummary) + } } - -appSummary = new AppSummary(appSummary.numCompletedJobs + 1, appSummary.numCompletedStages) -kvstore.write(appSummary) } override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { @@ -639,10 +640,11 @@ private[spark] class AppStatusListener( if (removeStage) { liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptNumber)) } + if (stage.status == v1.StageStatus.COMPLETE) { +appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1) +kvstore.write(appSummary) + } } - -appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1) -kvstore.write(appSummary) } private def removeBlackListedStageFrom(exec: LiveExecutor, stageId: Int, now: Long) = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r29747 - in /dev/spark/2.4.1-SNAPSHOT-2018_09_27_10_03-0256f8a-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Sep 27 17:17:35 2018 New Revision: 29747 Log: Apache Spark 2.4.1-SNAPSHOT-2018_09_27_10_03-0256f8a docs [This commit notification would consist of 1472 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25546][CORE] Don't cache value of EVENT_LOG_CALLSITE_LONG_FORM.
Repository: spark Updated Branches: refs/heads/branch-2.4 659ecb54a -> 0256f8a09 [SPARK-25546][CORE] Don't cache value of EVENT_LOG_CALLSITE_LONG_FORM. Caching the value of that config means different instances of SparkEnv will always use whatever was the first value to be read. It also breaks tests that use RDDInfo outside of the scope of a SparkContext. Since this is not a performance sensitive area, there's no advantage in caching the config value. Closes #22558 from vanzin/SPARK-25546. Authored-by: Marcelo Vanzin Signed-off-by: Dongjoon Hyun (cherry picked from commit 5fd22d05363dd8c0e1b10f3822ccb71eb42f6db9) Signed-off-by: Dongjoon Hyun Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0256f8a0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0256f8a0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0256f8a0 Branch: refs/heads/branch-2.4 Commit: 0256f8a0973c2fc8815fa710670dbe68317335b5 Parents: 659ecb5 Author: Marcelo Vanzin Authored: Thu Sep 27 09:26:50 2018 -0700 Committer: Dongjoon Hyun Committed: Thu Sep 27 09:27:05 2018 -0700 -- core/src/main/scala/org/apache/spark/storage/RDDInfo.scala | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0256f8a0/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala index 19f8656..917cfab 100644 --- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala +++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala @@ -55,11 +55,13 @@ class RDDInfo( } private[spark] object RDDInfo { - private val callsiteLongForm = SparkEnv.get.conf.get(EVENT_LOG_CALLSITE_LONG_FORM) - def fromRdd(rdd: RDD[_]): RDDInfo = { val rddName = Option(rdd.name).getOrElse(Utils.getFormattedClassName(rdd)) val parentIds = rdd.dependencies.map(_.rdd.id) +val callsiteLongForm = Option(SparkEnv.get) + .map(_.conf.get(EVENT_LOG_CALLSITE_LONG_FORM)) + .getOrElse(false) + val callSite = if (callsiteLongForm) { rdd.creationSite.longForm } else { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r29741 - in /dev/spark/v2.4.0-rc2-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _site/api/java/org/apache/spark
Author: wenchen Date: Thu Sep 27 15:45:37 2018 New Revision: 29741 Log: Apache Spark v2.4.0-rc2 docs [This commit notification would consist of 1474 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r29740 - /dev/spark/v2.4.0-rc2-bin/
Author: wenchen Date: Thu Sep 27 15:24:41 2018 New Revision: 29740 Log: Apache Spark v2.4.0-rc2 Added: dev/spark/v2.4.0-rc2-bin/ dev/spark/v2.4.0-rc2-bin/SparkR_2.4.0.tar.gz (with props) dev/spark/v2.4.0-rc2-bin/SparkR_2.4.0.tar.gz.asc dev/spark/v2.4.0-rc2-bin/SparkR_2.4.0.tar.gz.sha512 dev/spark/v2.4.0-rc2-bin/pyspark-2.4.0.tar.gz (with props) dev/spark/v2.4.0-rc2-bin/pyspark-2.4.0.tar.gz.asc dev/spark/v2.4.0-rc2-bin/pyspark-2.4.0.tar.gz.sha512 dev/spark/v2.4.0-rc2-bin/spark-2.4.0-bin-hadoop2.6.tgz (with props) dev/spark/v2.4.0-rc2-bin/spark-2.4.0-bin-hadoop2.6.tgz.asc dev/spark/v2.4.0-rc2-bin/spark-2.4.0-bin-hadoop2.6.tgz.sha512 dev/spark/v2.4.0-rc2-bin/spark-2.4.0-bin-hadoop2.7.tgz (with props) dev/spark/v2.4.0-rc2-bin/spark-2.4.0-bin-hadoop2.7.tgz.asc dev/spark/v2.4.0-rc2-bin/spark-2.4.0-bin-hadoop2.7.tgz.sha512 dev/spark/v2.4.0-rc2-bin/spark-2.4.0-bin-without-hadoop-scala-2.12.tgz (with props) dev/spark/v2.4.0-rc2-bin/spark-2.4.0-bin-without-hadoop-scala-2.12.tgz.asc dev/spark/v2.4.0-rc2-bin/spark-2.4.0-bin-without-hadoop-scala-2.12.tgz.sha512 dev/spark/v2.4.0-rc2-bin/spark-2.4.0-bin-without-hadoop.tgz (with props) dev/spark/v2.4.0-rc2-bin/spark-2.4.0-bin-without-hadoop.tgz.asc dev/spark/v2.4.0-rc2-bin/spark-2.4.0-bin-without-hadoop.tgz.sha512 dev/spark/v2.4.0-rc2-bin/spark-2.4.0.tgz (with props) dev/spark/v2.4.0-rc2-bin/spark-2.4.0.tgz.asc dev/spark/v2.4.0-rc2-bin/spark-2.4.0.tgz.sha512 Added: dev/spark/v2.4.0-rc2-bin/SparkR_2.4.0.tar.gz == Binary file - no diff available. Propchange: dev/spark/v2.4.0-rc2-bin/SparkR_2.4.0.tar.gz -- svn:mime-type = application/octet-stream Added: dev/spark/v2.4.0-rc2-bin/SparkR_2.4.0.tar.gz.asc == --- dev/spark/v2.4.0-rc2-bin/SparkR_2.4.0.tar.gz.asc (added) +++ dev/spark/v2.4.0-rc2-bin/SparkR_2.4.0.tar.gz.asc Thu Sep 27 15:24:41 2018 @@ -0,0 +1,17 @@ +-BEGIN PGP SIGNATURE- +Version: GnuPG v1 + +iQIcBAABAgAGBQJbrPJsAAoJEGuscolPT9yK8mYQALZ5gaQNIIXDkixdlRpkSI5F +tMcIShp/CBsyJrfJ6qjfds/DcY6XJwoKrIbZMOT8NJtc3eBSuW2KPCBwHrC/GnqC +WpDc8UUq4cxjiXP0x8iYWU87vJPS25jETwPv1qBoDJWkmt1Ks5WgudYdGw7bYqvf +s+O/ndgMrUBmTNQ9LWi4YKXOWzrw6++j5zO6QoNNjiNPMLlGkyd5D7Fcj2fAEdZf +lImF6FcOYQGxlpZOyIzaLjSpcfrFmdNvTqfojOTt+pfEPyleFdMkCFSXxdM++jng +rppv64o3d3f3abGnFCdtrJJ+QGBzYz5h9kHEG30kIwSG5TQCeR4A9Iiv8O8OI0+A +2XvplokKJA7RB6SbrYReVBbjU8IRN9SLpWpeEq664RHSnaqNL87FDk4n8BACGP5T +U7Psmz5fEXgWadCqmiu/2sN9GsdUU+p7qm+Y7xuZ1+LaJ68gLjTzGxAccc5C5WHz +dZA49f7Sde34NxKoycAwjsQSNI6H5vZNMdtuFB4iwvUoN1UCZL2AhfqbTAqhIFfn +Fq+A10D+VV09vaX5FFcqdgXiFUNQFbFbQLDgSNH4juxS1jwypKlrtbzRUPHOChsi +2vtUr7zVHrRl93khBKaeASD/Uhe/0QsZzVWYr6phFMsXhw8nbPdBoGGKe3K0M+8a +VIVE0npdlbWhFjMEyOtl +=wzRX +-END PGP SIGNATURE- Added: dev/spark/v2.4.0-rc2-bin/SparkR_2.4.0.tar.gz.sha512 == --- dev/spark/v2.4.0-rc2-bin/SparkR_2.4.0.tar.gz.sha512 (added) +++ dev/spark/v2.4.0-rc2-bin/SparkR_2.4.0.tar.gz.sha512 Thu Sep 27 15:24:41 2018 @@ -0,0 +1,3 @@ +SparkR_2.4.0.tar.gz: 0FA52FFC 2ECDF6CA C2E588AD 684ADD30 95C3C695 F90EF05F + 03BC141F 690F3BB5 D76D8FCC A7CBDFE4 9318D1E0 15EEB42A + 300219E9 6D33584B 26F438B7 5AA0554F Added: dev/spark/v2.4.0-rc2-bin/pyspark-2.4.0.tar.gz == Binary file - no diff available. Propchange: dev/spark/v2.4.0-rc2-bin/pyspark-2.4.0.tar.gz -- svn:mime-type = application/octet-stream Added: dev/spark/v2.4.0-rc2-bin/pyspark-2.4.0.tar.gz.asc == --- dev/spark/v2.4.0-rc2-bin/pyspark-2.4.0.tar.gz.asc (added) +++ dev/spark/v2.4.0-rc2-bin/pyspark-2.4.0.tar.gz.asc Thu Sep 27 15:24:41 2018 @@ -0,0 +1,17 @@ +-BEGIN PGP SIGNATURE- +Version: GnuPG v1 + +iQIcBAABAgAGBQJbrO/MAAoJEGuscolPT9yKdC0P/22mGoNJwz+7IhNtTPnYBHcl +2sbVsdtEX5ctIlM/gOyXW9Q/V4S6NzqGRWV8/NJazPILP31wAv6fdaahZAj0EOzA ++MQBphmOQ938ZEVM+Ui6DEelO1fg8o+F5Ei7hUxCU09X3LRHcb98cu5qBT/JJg3l +zJmFMhplz5NdLrkas/CagvScoV+7TmQ293aK71UxOQwI6ZaIUMQopP0NI7P+sl/Z +PAUA/plgYzOWuLqaWFZkDQnbydn7SDNurYbBNV+3QJCTUnWdWsnqroECp1p4FqKf +zR4e6iPOenitohGSHR8zjt59AlObOrUyPm79BbpHtlsj6XpU/o63M55eCL5eIBCu +KtN6uMdiAymBxKcE63Gnze00XI/PkoIBjRJwo3diDDeiAtwxgsB6PGQrKFvAeIu0 +iCMQ0XGMSZaXVckzz7Mo8e3EpOZ1c+cGZMKlEmhvTzjquKPlqZj9X5eINdGuiWSB +cMH/8XVRgyjF440WpVpOlc+cdYBfMelD3rLqq/A4kp8D2A61lM1lqzBvHqooPx8h +hsinuO4lGMMf7VUbgUHtVVvcPKlCID0tpKZEVsdyNHACaI5aL7qYx7DkFx73WpQ4 +elpBj/jQCsfxstxFjlH0lmYAYXFaNDAhsDj9AgcDgA0k3EfU563+RViODCPX/mvv +/kMv4NPK9F2RIdvs1DcM +=NoRl +-END PGP SIGNATURE- Added:
svn commit: r29739 - in /dev/spark/2.5.0-SNAPSHOT-2018_09_27_08_02-a1adde5-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Sep 27 15:17:14 2018 New Revision: 29739 Log: Apache Spark 2.5.0-SNAPSHOT-2018_09_27_08_02-a1adde5 docs [This commit notification would consist of 1485 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[2/2] spark git commit: Preparing development version 2.4.1-SNAPSHOT
Preparing development version 2.4.1-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/659ecb54 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/659ecb54 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/659ecb54 Branch: refs/heads/branch-2.4 Commit: 659ecb54ae0ec4c9a0e9513b295fb67d2146ba9c Parents: 42f25f3 Author: Wenchen Fan Authored: Thu Sep 27 14:31:03 2018 + Committer: Wenchen Fan Committed: Thu Sep 27 14:31:03 2018 + -- R/pkg/DESCRIPTION | 2 +- assembly/pom.xml | 2 +- common/kvstore/pom.xml | 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml | 2 +- common/network-yarn/pom.xml| 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml| 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- docs/_config.yml | 4 ++-- examples/pom.xml | 2 +- external/avro/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml| 2 +- external/flume-sink/pom.xml| 2 +- external/flume/pom.xml | 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10-sql/pom.xml| 2 +- external/kafka-0-10/pom.xml| 2 +- external/kafka-0-8-assembly/pom.xml| 2 +- external/kafka-0-8/pom.xml | 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml| 2 +- graphx/pom.xml | 2 +- hadoop-cloud/pom.xml | 2 +- launcher/pom.xml | 2 +- mllib-local/pom.xml| 2 +- mllib/pom.xml | 2 +- pom.xml| 2 +- python/pyspark/version.py | 2 +- repl/pom.xml | 2 +- resource-managers/kubernetes/core/pom.xml | 2 +- resource-managers/kubernetes/integration-tests/pom.xml | 2 +- resource-managers/mesos/pom.xml| 2 +- resource-managers/yarn/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- 43 files changed, 44 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/659ecb54/R/pkg/DESCRIPTION -- diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index f52d785..714b6f1 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -1,6 +1,6 @@ Package: SparkR Type: Package -Version: 2.4.0 +Version: 2.4.1 Title: R Frontend for Apache Spark Description: Provides an R Frontend for Apache Spark. Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"), http://git-wip-us.apache.org/repos/asf/spark/blob/659ecb54/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index 63ab510..ee0de73 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.4.0 +2.4.1-SNAPSHOT ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/659ecb54/common/kvstore/pom.xml -- diff --git a/common/kvstore/pom.xml b/common/kvstore/pom.xml index b10e118..b89e0fe 100644 --- a/common/kvstore/pom.xml +++ b/common/kvstore/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.4.0 +2.4.1-SNAPSHOT ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/659ecb54/common/network-common/pom.xml -- diff --git a/common/network-common/pom.xml
[1/2] spark git commit: Preparing Spark release v2.4.0-rc2
Repository: spark Updated Branches: refs/heads/branch-2.4 3c78ea258 -> 659ecb54a Preparing Spark release v2.4.0-rc2 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42f25f30 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42f25f30 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42f25f30 Branch: refs/heads/branch-2.4 Commit: 42f25f309e91c8cde1814e3720099ac1e64783da Parents: 3c78ea2 Author: Wenchen Fan Authored: Thu Sep 27 14:30:59 2018 + Committer: Wenchen Fan Committed: Thu Sep 27 14:30:59 2018 + -- R/pkg/DESCRIPTION | 2 +- assembly/pom.xml | 2 +- common/kvstore/pom.xml | 2 +- common/network-common/pom.xml | 2 +- common/network-shuffle/pom.xml | 2 +- common/network-yarn/pom.xml| 2 +- common/sketch/pom.xml | 2 +- common/tags/pom.xml| 2 +- common/unsafe/pom.xml | 2 +- core/pom.xml | 2 +- docs/_config.yml | 4 ++-- examples/pom.xml | 2 +- external/avro/pom.xml | 2 +- external/docker-integration-tests/pom.xml | 2 +- external/flume-assembly/pom.xml| 2 +- external/flume-sink/pom.xml| 2 +- external/flume/pom.xml | 2 +- external/kafka-0-10-assembly/pom.xml | 2 +- external/kafka-0-10-sql/pom.xml| 2 +- external/kafka-0-10/pom.xml| 2 +- external/kafka-0-8-assembly/pom.xml| 2 +- external/kafka-0-8/pom.xml | 2 +- external/kinesis-asl-assembly/pom.xml | 2 +- external/kinesis-asl/pom.xml | 2 +- external/spark-ganglia-lgpl/pom.xml| 2 +- graphx/pom.xml | 2 +- hadoop-cloud/pom.xml | 2 +- launcher/pom.xml | 2 +- mllib-local/pom.xml| 2 +- mllib/pom.xml | 2 +- pom.xml| 2 +- python/pyspark/version.py | 2 +- repl/pom.xml | 2 +- resource-managers/kubernetes/core/pom.xml | 2 +- resource-managers/kubernetes/integration-tests/pom.xml | 2 +- resource-managers/mesos/pom.xml| 2 +- resource-managers/yarn/pom.xml | 2 +- sql/catalyst/pom.xml | 2 +- sql/core/pom.xml | 2 +- sql/hive-thriftserver/pom.xml | 2 +- sql/hive/pom.xml | 2 +- streaming/pom.xml | 2 +- tools/pom.xml | 2 +- 43 files changed, 44 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/42f25f30/R/pkg/DESCRIPTION -- diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index 714b6f1..f52d785 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -1,6 +1,6 @@ Package: SparkR Type: Package -Version: 2.4.1 +Version: 2.4.0 Title: R Frontend for Apache Spark Description: Provides an R Frontend for Apache Spark. Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"), http://git-wip-us.apache.org/repos/asf/spark/blob/42f25f30/assembly/pom.xml -- diff --git a/assembly/pom.xml b/assembly/pom.xml index ee0de73..63ab510 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -21,7 +21,7 @@ org.apache.spark spark-parent_2.11 -2.4.1-SNAPSHOT +2.4.0 ../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/42f25f30/common/kvstore/pom.xml -- diff --git a/common/kvstore/pom.xml b/common/kvstore/pom.xml index b89e0fe..b10e118 100644 --- a/common/kvstore/pom.xml +++ b/common/kvstore/pom.xml @@ -22,7 +22,7 @@ org.apache.spark spark-parent_2.11 -2.4.1-SNAPSHOT +2.4.0 ../../pom.xml http://git-wip-us.apache.org/repos/asf/spark/blob/42f25f30/common/network-common/pom.xml
[spark] Git Push Summary
Repository: spark Updated Tags: refs/tags/v2.4.0-rc2 [created] 42f25f309 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24341][SQL][FOLLOWUP] remove duplicated error checking
Repository: spark Updated Branches: refs/heads/master f856fe483 -> a1adde540 [SPARK-24341][SQL][FOLLOWUP] remove duplicated error checking ## What changes were proposed in this pull request? There are 2 places we check for problematic `InSubquery`: the rule `ResolveSubquery` and `InSubquery.checkInputDataTypes`. We should unify them. ## How was this patch tested? existing tests Closes #22563 from cloud-fan/followup. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a1adde54 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a1adde54 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a1adde54 Branch: refs/heads/master Commit: a1adde54086469b45950946d9143d17daab01f18 Parents: f856fe4 Author: Wenchen Fan Authored: Thu Sep 27 21:19:25 2018 +0800 Committer: Wenchen Fan Committed: Thu Sep 27 21:19:25 2018 +0800 -- .../spark/sql/catalyst/analysis/Analyzer.scala | 16 +- .../sql/catalyst/expressions/predicates.scala | 60 ++-- .../sql-tests/results/datetime.sql.out | 5 +- .../results/higher-order-functions.sql.out | 1 + .../subquery/in-subquery/in-basic.sql.out | 10 ++-- .../negative-cases/subq-input-typecheck.sql.out | 20 +++ 6 files changed, 49 insertions(+), 63 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a1adde54/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e3b1712..7034dfd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1436,21 +1436,7 @@ class Analyzer( val expr = resolveSubQuery(l, plans)((plan, exprs) => { ListQuery(plan, exprs, exprId, plan.output) }) - val subqueryOutput = expr.plan.output - val resolvedIn = InSubquery(values, expr.asInstanceOf[ListQuery]) - if (values.length != subqueryOutput.length) { -throw new AnalysisException( - s"""Cannot analyze ${resolvedIn.sql}. - |The number of columns in the left hand side of an IN subquery does not match the - |number of columns in the output of subquery. - |#columns in left hand side: ${values.length} - |#columns in right hand side: ${subqueryOutput.length} - |Left side columns: - |[${values.map(_.sql).mkString(", ")}] - |Right side columns: - |[${subqueryOutput.map(_.sql).mkString(", ")}]""".stripMargin) - } - resolvedIn + InSubquery(values, expr.asInstanceOf[ListQuery]) } } http://git-wip-us.apache.org/repos/asf/spark/blob/a1adde54/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 149bd79..2125340 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -144,7 +144,7 @@ case class Not(child: Expression) case class InSubquery(values: Seq[Expression], query: ListQuery) extends Predicate with Unevaluable { - @transient lazy val value: Expression = if (values.length > 1) { + @transient private lazy val value: Expression = if (values.length > 1) { CreateNamedStruct(values.zipWithIndex.flatMap { case (v: NamedExpression, _) => Seq(Literal(v.name), v) case (v, idx) => Seq(Literal(s"_$idx"), v) @@ -155,37 +155,35 @@ case class InSubquery(values: Seq[Expression], query: ListQuery) override def checkInputDataTypes(): TypeCheckResult = { -val mismatchOpt = !DataType.equalsStructurally(query.dataType, value.dataType, - ignoreNullability = true) -if (mismatchOpt) { - if (values.length != query.childOutputs.length) { -TypeCheckResult.TypeCheckFailure( - s""" - |The number of columns in the left hand side of an IN subquery does not match the - |number of columns in the output of subquery. - |#columns in left hand side: ${values.length}. - |#columns in right hand side:
svn commit: r29737 - in /dev/spark/2.4.1-SNAPSHOT-2018_09_27_06_02-3c78ea2-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Sep 27 13:17:19 2018 New Revision: 29737 Log: Apache Spark 2.4.1-SNAPSHOT-2018_09_27_06_02-3c78ea2 docs [This commit notification would consist of 1472 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21436][CORE] Take advantage of known partitioner for distinct on RDDs to avoid a shuffle
Repository: spark Updated Branches: refs/heads/master dd8f6b1ce -> f856fe483 [SPARK-21436][CORE] Take advantage of known partitioner for distinct on RDDs to avoid a shuffle ## What changes were proposed in this pull request? Special case the situation where we know the partioner and the number of requested partions output is the same as the current partioner to avoid a shuffle and instead compute distinct inside of each partion. ## How was this patch tested? New unit test that verifies partitioner does not change if the partitioner is known and distinct is called with the same target # of partition. Closes #22010 from holdenk/SPARK-21436-take-advantage-of-known-partioner-for-distinct-on-rdds. Authored-by: Holden Karau Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f856fe48 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f856fe48 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f856fe48 Branch: refs/heads/master Commit: f856fe4839757e3a1036df3fc3dec459fa439aef Parents: dd8f6b1 Author: Holden Karau Authored: Thu Sep 27 20:57:56 2018 +0800 Committer: Wenchen Fan Committed: Thu Sep 27 20:57:56 2018 +0800 -- .../src/main/scala/org/apache/spark/rdd/RDD.scala | 18 -- .../scala/org/apache/spark/rdd/RDDSuite.scala | 12 2 files changed, 28 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f856fe48/core/src/main/scala/org/apache/spark/rdd/RDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 61ad6df..743e344 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -42,7 +42,8 @@ import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.storage.{RDDBlockId, StorageLevel} import org.apache.spark.util.{BoundedPriorityQueue, Utils} -import org.apache.spark.util.collection.{OpenHashMap, Utils => collectionUtils} +import org.apache.spark.util.collection.{ExternalAppendOnlyMap, OpenHashMap, + Utils => collectionUtils} import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, PoissonSampler, SamplingUtils} @@ -396,7 +397,20 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { -map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = { + // Create an instance of external append only map which ignores values. + val map = new ExternalAppendOnlyMap[T, Null, Null]( +createCombiner = value => null, +mergeValue = (a, b) => a, +mergeCombiners = (a, b) => a) + map.insertAll(partition.map(_ -> null)) + map.iterator.map(_._1) +} +partitioner match { + case Some(p) if numPartitions == partitions.length => +mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true) + case _ => map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) +} } /** http://git-wip-us.apache.org/repos/asf/spark/blob/f856fe48/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index b143a46..2227698 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -95,6 +95,18 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { assert(!deserial.toString().isEmpty()) } + test("distinct with known partitioner preserves partitioning") { +val rdd = sc.parallelize(1.to(100), 10).map(x => (x % 10, x % 10)).sortByKey() +val initialPartitioner = rdd.partitioner +val distinctRdd = rdd.distinct() +val resultingPartitioner = distinctRdd.partitioner +assert(initialPartitioner === resultingPartitioner) +val distinctRddDifferent = rdd.distinct(5) +val distinctRddDifferentPartitioner = distinctRddDifferent.partitioner +assert(initialPartitioner != distinctRddDifferentPartitioner) +assert(distinctRdd.collect().sorted === distinctRddDifferent.collect().sorted) + } + test("countApproxDistinct") { def error(est: Long, size: Long): Double = math.abs(est - size) / size.toDouble -
spark git commit: [SPARK-25541][SQL][FOLLOWUP] Remove overriding filterKeys in CaseInsensitiveMap
Repository: spark Updated Branches: refs/heads/master 86a2450e0 -> dd8f6b1ce [SPARK-25541][SQL][FOLLOWUP] Remove overriding filterKeys in CaseInsensitiveMap ## What changes were proposed in this pull request? As per the discussion in https://github.com/apache/spark/pull/22553#pullrequestreview-159192221, override `filterKeys` violates the documented semantics. This PR is to remove it and add documentation. Also fix one potential non-serializable map in `FileStreamOptions`. The only one call of `CaseInsensitiveMap`'s `filterKeys` left is https://github.com/apache/spark/blob/c3c45cbd76d91d591d98cf8411fcfd30079f5969/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala#L88-L90 But this one is OK. ## How was this patch tested? Existing unit tests. Closes #22562 from gengliangwang/SPARK-25541-FOLLOWUP. Authored-by: Gengliang Wang Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd8f6b1c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd8f6b1c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd8f6b1c Branch: refs/heads/master Commit: dd8f6b1ce8ae7b2b75efda863fea40b29d52f657 Parents: 86a2450 Author: Gengliang Wang Authored: Thu Sep 27 19:53:13 2018 +0800 Committer: Wenchen Fan Committed: Thu Sep 27 19:53:13 2018 +0800 -- .../apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala| 6 ++ .../spark/sql/catalyst/util/CaseInsensitiveMapSuite.scala | 6 -- .../spark/sql/execution/streaming/FileStreamOptions.scala | 3 +-- 3 files changed, 3 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dd8f6b1c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala index 288a4f3..06f9598 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala @@ -24,6 +24,8 @@ import java.util.Locale * case-sensitive information is required. The primary constructor is marked private to avoid * nested case-insensitive map creation, otherwise the keys in the original map will become * case-insensitive in this scenario. + * Note: CaseInsensitiveMap is serializable. However, after transformation, e.g. `filterKeys()`, + * it may become not serializable. */ class CaseInsensitiveMap[T] private (val originalMap: Map[String, T]) extends Map[String, T] with Serializable { @@ -44,10 +46,6 @@ class CaseInsensitiveMap[T] private (val originalMap: Map[String, T]) extends Ma override def -(key: String): Map[String, T] = { new CaseInsensitiveMap(originalMap.filter(!_._1.equalsIgnoreCase(key))) } - - override def filterKeys(p: (String) => Boolean): Map[String, T] = { -new CaseInsensitiveMap(originalMap.filter(kv => p(kv._1))) - } } object CaseInsensitiveMap { http://git-wip-us.apache.org/repos/asf/spark/blob/dd8f6b1c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMapSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMapSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMapSuite.scala index 03eed4a..a8bb1d0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMapSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMapSuite.scala @@ -44,10 +44,4 @@ class CaseInsensitiveMapSuite extends SparkFunSuite { assert(m == Map("a" -> "b", "foo" -> "bar", "x" -> "y")) shouldBeSerializable(m) } - - test("CaseInsensitiveMap should be serializable after 'filterKeys' method") { -val m = CaseInsensitiveMap(Map("a" -> "b", "foo" -> "bar")).filterKeys(_ == "foo") -assert(m == Map("foo" -> "bar")) -shouldBeSerializable(m) - } } http://git-wip-us.apache.org/repos/asf/spark/blob/dd8f6b1c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index d54ed44..1d57cb0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
spark git commit: [SPARK-25522][SQL] Improve type promotion for input arguments of elementAt function
Repository: spark Updated Branches: refs/heads/branch-2.4 53eb85854 -> 3c78ea258 [SPARK-25522][SQL] Improve type promotion for input arguments of elementAt function ## What changes were proposed in this pull request? In ElementAt, when first argument is MapType, we should coerce the key type and the second argument based on findTightestCommonType. This is not happening currently. We may produce wrong output as we will incorrectly downcast the right hand side double expression to int. ```SQL spark-sql> select element_at(map(1,"one", 2, "two"), 2.2); two ``` Also, when the first argument is ArrayType, the second argument should be an integer type or a smaller integral type that can be safely casted to an integer type. Currently we may do an unsafe cast. In the following case, we should fail with an error as 2.2 is not a integer index. But instead we down cast it to int currently and return a result instead. ```SQL spark-sql> select element_at(array(1,2), 1.24D); 1 ``` This PR also supports implicit cast between two MapTypes. I have followed similar logic that exists today to do implicit casts between two array types. ## How was this patch tested? Added new tests in DataFrameFunctionSuite, TypeCoercionSuite. Closes #22544 from dilipbiswal/SPARK-25522. Authored-by: Dilip Biswal Signed-off-by: Wenchen Fan (cherry picked from commit d03e0af80d7659f12821cc2442efaeaee94d3985) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c78ea25 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c78ea25 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c78ea25 Branch: refs/heads/branch-2.4 Commit: 3c78ea2589e1e2f3824ae6fa273eceaee3934391 Parents: 53eb858 Author: Dilip Biswal Authored: Thu Sep 27 15:04:59 2018 +0800 Committer: Wenchen Fan Committed: Thu Sep 27 19:50:01 2018 +0800 -- .../sql/catalyst/analysis/TypeCoercion.scala| 19 + .../spark/sql/catalyst/expressions/Cast.scala | 2 +- .../expressions/collectionOperations.scala | 37 ++ .../catalyst/analysis/TypeCoercionSuite.scala | 43 +-- .../spark/sql/DataFrameFunctionsSuite.scala | 75 +++- 5 files changed, 154 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3c78ea25/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index 49d286f..72ac80e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -950,6 +950,25 @@ object TypeCoercion { if !Cast.forceNullable(fromType, toType) => implicitCast(fromType, toType).map(ArrayType(_, false)).orNull +// Implicit cast between Map types. +// Follows the same semantics of implicit casting between two array types. +// Refer to documentation above. Make sure that both key and values +// can not be null after the implicit cast operation by calling forceNullable +// method. +case (MapType(fromKeyType, fromValueType, fn), MapType(toKeyType, toValueType, tn)) +if !Cast.forceNullable(fromKeyType, toKeyType) && Cast.resolvableNullability(fn, tn) => + if (Cast.forceNullable(fromValueType, toValueType) && !tn) { +null + } else { +val newKeyType = implicitCast(fromKeyType, toKeyType).orNull +val newValueType = implicitCast(fromValueType, toValueType).orNull +if (newKeyType != null && newValueType != null) { + MapType(newKeyType, newValueType, tn) +} else { + null +} + } + case _ => null } Option(ret) http://git-wip-us.apache.org/repos/asf/spark/blob/3c78ea25/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 8f77799..ee463bf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -183,7 +183,7 @@ object Cast { case _ => false } - private def resolvableNullability(from: Boolean, to: Boolean) = !from ||
spark-website git commit: Update my affiliation
Repository: spark-website Updated Branches: refs/heads/asf-site 74d902cdc -> 8b7444182 Update my affiliation Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/8b744418 Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/8b744418 Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/8b744418 Branch: refs/heads/asf-site Commit: 8b7444182083e968e6dbfd1def2f5cb1635b2465 Parents: 74d902c Author: jerryshao Authored: Thu Sep 27 19:42:30 2018 +0800 Committer: jerryshao Committed: Thu Sep 27 19:42:30 2018 +0800 -- committers.md| 2 +- site/committers.html | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark-website/blob/8b744418/committers.md -- diff --git a/committers.md b/committers.md index b64e278..957ed4c 100644 --- a/committers.md +++ b/committers.md @@ -61,7 +61,7 @@ navigation: |Josh Rosen|Databricks| |Sandy Ryza|Remix| |Kousuke Saruta|NTT Data| -|Saisai Shao|Hortonworks| +|Saisai Shao|Tencent| |Prashant Sharma|IBM| |Ram Sriharsha|Databricks| |DB Tsai|Apple| http://git-wip-us.apache.org/repos/asf/spark-website/blob/8b744418/site/committers.html -- diff --git a/site/committers.html b/site/committers.html index bc65924..7bc47a0 100644 --- a/site/committers.html +++ b/site/committers.html @@ -416,7 +416,7 @@ Saisai Shao - Hortonworks + Tencent Prashant Sharma - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25551][SQL] Remove unused InSubquery expression
Repository: spark Updated Branches: refs/heads/master 2a8cbfddb -> 86a2450e0 [SPARK-25551][SQL] Remove unused InSubquery expression ## What changes were proposed in this pull request? The PR removes the `InSubquery` expression which was introduced a long time ago and its only usage was removed in https://github.com/apache/spark/commit/4ce970d71488c7de6025ef925f75b8b92a5a6a79. Hence it is not used anymore. ## How was this patch tested? existing UTs Closes #22556 from mgaido91/minor_insubq. Authored-by: Marco Gaido Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/86a2450e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/86a2450e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/86a2450e Branch: refs/heads/master Commit: 86a2450e09cbd3affbd66139ce4ed2b807e7b3b3 Parents: 2a8cbfd Author: Marco Gaido Authored: Thu Sep 27 19:34:05 2018 +0800 Committer: Wenchen Fan Committed: Thu Sep 27 19:34:05 2018 +0800 -- .../apache/spark/sql/execution/subquery.scala | 43 1 file changed, 43 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/86a2450e/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index d11045f..310ebcd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -91,49 +91,6 @@ case class ScalarSubquery( } /** - * A subquery that will check the value of `child` whether is in the result of a query or not. - */ -case class InSubquery( -child: Expression, -plan: SubqueryExec, -exprId: ExprId, -private var result: Array[Any] = null, -private var updated: Boolean = false) extends ExecSubqueryExpression { - - override def dataType: DataType = BooleanType - override def children: Seq[Expression] = child :: Nil - override def nullable: Boolean = child.nullable - override def toString: String = s"$child IN ${plan.name}" - override def withNewPlan(plan: SubqueryExec): InSubquery = copy(plan = plan) - - override def semanticEquals(other: Expression): Boolean = other match { -case in: InSubquery => child.semanticEquals(in.child) && plan.sameResult(in.plan) -case _ => false - } - - def updateResult(): Unit = { -val rows = plan.executeCollect() -result = rows.map(_.get(0, child.dataType)).asInstanceOf[Array[Any]] -updated = true - } - - override def eval(input: InternalRow): Any = { -require(updated, s"$this has not finished") -val v = child.eval(input) -if (v == null) { - null -} else { - result.contains(v) -} - } - - override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -require(updated, s"$this has not finished") -InSet(child, result.toSet).doGenCode(ctx, ev) - } -} - -/** * Plans scalar subqueries from that are present in the given [[SparkPlan]]. */ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r29736 - in /dev/spark/2.5.0-SNAPSHOT-2018_09_27_04_02-2a8cbfd-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Sep 27 11:17:10 2018 New Revision: 29736 Log: Apache Spark 2.5.0-SNAPSHOT-2018_09_27_04_02-2a8cbfd docs [This commit notification would consist of 1485 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r29734 - in /dev/spark/2.4.1-SNAPSHOT-2018_09_27_02_02-53eb858-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Sep 27 09:16:59 2018 New Revision: 29734 Log: Apache Spark 2.4.1-SNAPSHOT-2018_09_27_02_02-53eb858 docs [This commit notification would consist of 1472 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r29733 - in /dev/spark/2.5.0-SNAPSHOT-2018_09_27_00_02-f309b28-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Thu Sep 27 07:17:48 2018 New Revision: 29733 Log: Apache Spark 2.5.0-SNAPSHOT-2018_09_27_00_02-f309b28 docs [This commit notification would consist of 1485 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25314][SQL] Fix Python UDF accessing attributes from both side of join in join conditions
Repository: spark Updated Branches: refs/heads/branch-2.4 0b4e58187 -> 53eb85854 [SPARK-25314][SQL] Fix Python UDF accessing attributes from both side of join in join conditions ## What changes were proposed in this pull request? Thanks for bahchis reporting this. It is more like a follow up work for #16581, this PR fix the scenario of Python UDF accessing attributes from both side of join in join condition. ## How was this patch tested? Add regression tests in PySpark and `BatchEvalPythonExecSuite`. Closes #22326 from xuanyuanking/SPARK-25314. Authored-by: Yuanjian Li Signed-off-by: Wenchen Fan (cherry picked from commit 2a8cbfddba2a59d144b32910c68c22d0199093fe) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53eb8585 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53eb8585 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53eb8585 Branch: refs/heads/branch-2.4 Commit: 53eb8585452b8637c8264c5ceb8d5fe28e7ae936 Parents: 0b4e581 Author: Yuanjian Li Authored: Thu Sep 27 15:13:18 2018 +0800 Committer: Wenchen Fan Committed: Thu Sep 27 15:13:39 2018 +0800 -- python/pyspark/sql/tests.py | 64 .../sql/catalyst/optimizer/Optimizer.scala | 8 ++- .../spark/sql/catalyst/optimizer/joins.scala| 49 +++ 3 files changed, 119 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/53eb8585/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index cb186de..dece1da 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -552,6 +552,70 @@ class SQLTests(ReusedSQLTestCase): df = left.crossJoin(right).filter(f("a", "b")) self.assertEqual(df.collect(), [Row(a=1, b=1)]) +def test_udf_in_join_condition(self): +# regression test for SPARK-25314 +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1)]) +right = self.spark.createDataFrame([Row(b=1)]) +f = udf(lambda a, b: a == b, BooleanType()) +df = left.join(right, f("a", "b")) +with self.assertRaisesRegexp(AnalysisException, 'Detected implicit cartesian product'): +df.collect() +with self.sql_conf({"spark.sql.crossJoin.enabled": True}): +self.assertEqual(df.collect(), [Row(a=1, b=1)]) + +def test_udf_in_left_semi_join_condition(self): +# regression test for SPARK-25314 +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) +right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1)]) +f = udf(lambda a, b: a == b, BooleanType()) +df = left.join(right, f("a", "b"), "leftsemi") +with self.assertRaisesRegexp(AnalysisException, 'Detected implicit cartesian product'): +df.collect() +with self.sql_conf({"spark.sql.crossJoin.enabled": True}): +self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) + +def test_udf_and_common_filter_in_join_condition(self): +# regression test for SPARK-25314 +# test the complex scenario with both udf and common filter +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) +right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=1, b1=3, b2=1)]) +f = udf(lambda a, b: a == b, BooleanType()) +df = left.join(right, [f("a", "b"), left.a1 == right.b1]) +# do not need spark.sql.crossJoin.enabled=true for udf is not the only join condition. +self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1, b=1, b1=1, b2=1)]) + +def test_udf_and_common_filter_in_left_semi_join_condition(self): +# regression test for SPARK-25314 +# test the complex scenario with both udf and common filter +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) +right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=1, b1=3, b2=1)]) +f = udf(lambda a, b: a == b, BooleanType()) +df = left.join(right, [f("a", "b"), left.a1 == right.b1], "left_semi") +# do not need spark.sql.crossJoin.enabled=true for udf is not the only join condition. +self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) + +def test_udf_not_supported_in_join_condition(self): +# regression test for SPARK-25314 +# test python udf is not supported in join type besides left_semi and inner join. +from
spark git commit: [SPARK-25314][SQL] Fix Python UDF accessing attributes from both side of join in join conditions
Repository: spark Updated Branches: refs/heads/master d03e0af80 -> 2a8cbfddb [SPARK-25314][SQL] Fix Python UDF accessing attributes from both side of join in join conditions ## What changes were proposed in this pull request? Thanks for bahchis reporting this. It is more like a follow up work for #16581, this PR fix the scenario of Python UDF accessing attributes from both side of join in join condition. ## How was this patch tested? Add regression tests in PySpark and `BatchEvalPythonExecSuite`. Closes #22326 from xuanyuanking/SPARK-25314. Authored-by: Yuanjian Li Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a8cbfdd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a8cbfdd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a8cbfdd Branch: refs/heads/master Commit: 2a8cbfddba2a59d144b32910c68c22d0199093fe Parents: d03e0af Author: Yuanjian Li Authored: Thu Sep 27 15:13:18 2018 +0800 Committer: Wenchen Fan Committed: Thu Sep 27 15:13:18 2018 +0800 -- python/pyspark/sql/tests.py | 64 .../sql/catalyst/optimizer/Optimizer.scala | 8 ++- .../spark/sql/catalyst/optimizer/joins.scala| 49 +++ 3 files changed, 119 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2a8cbfdd/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 64a7ceb..b88a655 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -596,6 +596,70 @@ class SQLTests(ReusedSQLTestCase): df = left.crossJoin(right).filter(f("a", "b")) self.assertEqual(df.collect(), [Row(a=1, b=1)]) +def test_udf_in_join_condition(self): +# regression test for SPARK-25314 +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1)]) +right = self.spark.createDataFrame([Row(b=1)]) +f = udf(lambda a, b: a == b, BooleanType()) +df = left.join(right, f("a", "b")) +with self.assertRaisesRegexp(AnalysisException, 'Detected implicit cartesian product'): +df.collect() +with self.sql_conf({"spark.sql.crossJoin.enabled": True}): +self.assertEqual(df.collect(), [Row(a=1, b=1)]) + +def test_udf_in_left_semi_join_condition(self): +# regression test for SPARK-25314 +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) +right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1)]) +f = udf(lambda a, b: a == b, BooleanType()) +df = left.join(right, f("a", "b"), "leftsemi") +with self.assertRaisesRegexp(AnalysisException, 'Detected implicit cartesian product'): +df.collect() +with self.sql_conf({"spark.sql.crossJoin.enabled": True}): +self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) + +def test_udf_and_common_filter_in_join_condition(self): +# regression test for SPARK-25314 +# test the complex scenario with both udf and common filter +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) +right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=1, b1=3, b2=1)]) +f = udf(lambda a, b: a == b, BooleanType()) +df = left.join(right, [f("a", "b"), left.a1 == right.b1]) +# do not need spark.sql.crossJoin.enabled=true for udf is not the only join condition. +self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1, b=1, b1=1, b2=1)]) + +def test_udf_and_common_filter_in_left_semi_join_condition(self): +# regression test for SPARK-25314 +# test the complex scenario with both udf and common filter +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)]) +right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=1, b1=3, b2=1)]) +f = udf(lambda a, b: a == b, BooleanType()) +df = left.join(right, [f("a", "b"), left.a1 == right.b1], "left_semi") +# do not need spark.sql.crossJoin.enabled=true for udf is not the only join condition. +self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)]) + +def test_udf_not_supported_in_join_condition(self): +# regression test for SPARK-25314 +# test python udf is not supported in join type besides left_semi and inner join. +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, a1=2, a2=2)])
spark git commit: [SPARK-25522][SQL] Improve type promotion for input arguments of elementAt function
Repository: spark Updated Branches: refs/heads/master ff876137f -> d03e0af80 [SPARK-25522][SQL] Improve type promotion for input arguments of elementAt function ## What changes were proposed in this pull request? In ElementAt, when first argument is MapType, we should coerce the key type and the second argument based on findTightestCommonType. This is not happening currently. We may produce wrong output as we will incorrectly downcast the right hand side double expression to int. ```SQL spark-sql> select element_at(map(1,"one", 2, "two"), 2.2); two ``` Also, when the first argument is ArrayType, the second argument should be an integer type or a smaller integral type that can be safely casted to an integer type. Currently we may do an unsafe cast. In the following case, we should fail with an error as 2.2 is not a integer index. But instead we down cast it to int currently and return a result instead. ```SQL spark-sql> select element_at(array(1,2), 1.24D); 1 ``` This PR also supports implicit cast between two MapTypes. I have followed similar logic that exists today to do implicit casts between two array types. ## How was this patch tested? Added new tests in DataFrameFunctionSuite, TypeCoercionSuite. Closes #22544 from dilipbiswal/SPARK-25522. Authored-by: Dilip Biswal Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d03e0af8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d03e0af8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d03e0af8 Branch: refs/heads/master Commit: d03e0af80d7659f12821cc2442efaeaee94d3985 Parents: ff87613 Author: Dilip Biswal Authored: Thu Sep 27 15:04:59 2018 +0800 Committer: Wenchen Fan Committed: Thu Sep 27 15:04:59 2018 +0800 -- .../sql/catalyst/analysis/TypeCoercion.scala| 19 + .../spark/sql/catalyst/expressions/Cast.scala | 2 +- .../expressions/collectionOperations.scala | 37 ++ .../catalyst/analysis/TypeCoercionSuite.scala | 43 +-- .../spark/sql/DataFrameFunctionsSuite.scala | 75 +++- 5 files changed, 154 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d03e0af8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index 49d286f..72ac80e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -950,6 +950,25 @@ object TypeCoercion { if !Cast.forceNullable(fromType, toType) => implicitCast(fromType, toType).map(ArrayType(_, false)).orNull +// Implicit cast between Map types. +// Follows the same semantics of implicit casting between two array types. +// Refer to documentation above. Make sure that both key and values +// can not be null after the implicit cast operation by calling forceNullable +// method. +case (MapType(fromKeyType, fromValueType, fn), MapType(toKeyType, toValueType, tn)) +if !Cast.forceNullable(fromKeyType, toKeyType) && Cast.resolvableNullability(fn, tn) => + if (Cast.forceNullable(fromValueType, toValueType) && !tn) { +null + } else { +val newKeyType = implicitCast(fromKeyType, toKeyType).orNull +val newValueType = implicitCast(fromValueType, toValueType).orNull +if (newKeyType != null && newValueType != null) { + MapType(newKeyType, newValueType, tn) +} else { + null +} + } + case _ => null } Option(ret) http://git-wip-us.apache.org/repos/asf/spark/blob/d03e0af8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 8f77799..ee463bf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -183,7 +183,7 @@ object Cast { case _ => false } - private def resolvableNullability(from: Boolean, to: Boolean) = !from || to + def resolvableNullability(from: Boolean, to: Boolean): Boolean = !from || to } /**
spark git commit: [SPARK-23715][SQL][DOC] improve document for from/to_utc_timestamp
Repository: spark Updated Branches: refs/heads/branch-2.4 0cf4c5bbe -> 0b4e58187 [SPARK-23715][SQL][DOC] improve document for from/to_utc_timestamp ## What changes were proposed in this pull request? We have an agreement that the behavior of `from/to_utc_timestamp` is corrected, although the function itself doesn't make much sense in Spark: https://issues.apache.org/jira/browse/SPARK-23715 This PR improves the document. ## How was this patch tested? N/A Closes #22543 from cloud-fan/doc. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan (cherry picked from commit ff876137faba1802b66ecd483ba15f6ccd83ffc5) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0b4e5818 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0b4e5818 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0b4e5818 Branch: refs/heads/branch-2.4 Commit: 0b4e58187b787cc7a6d57a2a9d467934ece24252 Parents: 0cf4c5b Author: Wenchen Fan Authored: Thu Sep 27 15:02:20 2018 +0800 Committer: Wenchen Fan Committed: Thu Sep 27 15:02:52 2018 +0800 -- R/pkg/R/functions.R | 26 + python/pyspark/sql/functions.py | 30 .../expressions/datetimeExpressions.scala | 30 3 files changed, 68 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0b4e5818/R/pkg/R/functions.R -- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 572dee5..63bd427 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2203,9 +2203,16 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType") }) #' @details -#' \code{from_utc_timestamp}: Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a -#' time in UTC, and renders that time as a timestamp in the given time zone. For example, 'GMT+1' -#' would yield '2017-07-14 03:40:00.0'. +#' \code{from_utc_timestamp}: This is a common function for databases supporting TIMESTAMP WITHOUT +#' TIMEZONE. This function takes a timestamp which is timezone-agnostic, and interprets it as a +#' timestamp in UTC, and renders that timestamp as a timestamp in the given time zone. +#' However, timestamp in Spark represents number of microseconds from the Unix epoch, which is not +#' timezone-agnostic. So in Spark this function just shift the timestamp value from UTC timezone to +#' the given timezone. +#' This function may return confusing result if the input is a string with timezone, e.g. +#' (\code{2018-03-13T06:18:23+00:00}). The reason is that, Spark firstly cast the string to +#' timestamp according to the timezone in the string, and finally display the result by converting +#' the timestamp to string according to the session local timezone. #' #' @rdname column_datetime_diff_functions #' @@ -2261,9 +2268,16 @@ setMethod("next_day", signature(y = "Column", x = "character"), }) #' @details -#' \code{to_utc_timestamp}: Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a -#' time in the given time zone, and renders that time as a timestamp in UTC. For example, 'GMT+1' -#' would yield '2017-07-14 01:40:00.0'. +#' \code{to_utc_timestamp}: This is a common function for databases supporting TIMESTAMP WITHOUT +#' TIMEZONE. This function takes a timestamp which is timezone-agnostic, and interprets it as a +#' timestamp in the given timezone, and renders that timestamp as a timestamp in UTC. +#' However, timestamp in Spark represents number of microseconds from the Unix epoch, which is not +#' timezone-agnostic. So in Spark this function just shift the timestamp value from the given +#' timezone to UTC timezone. +#' This function may return confusing result if the input is a string with timezone, e.g. +#' (\code{2018-03-13T06:18:23+00:00}). The reason is that, Spark firstly cast the string to +#' timestamp according to the timezone in the string, and finally display the result by converting +#' the timestamp to string according to the session local timezone. #' #' @rdname column_datetime_diff_functions #' @aliases to_utc_timestamp to_utc_timestamp,Column,character-method http://git-wip-us.apache.org/repos/asf/spark/blob/0b4e5818/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 6da5237..8c54179 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1283,9 +1283,18 @@ def unix_timestamp(timestamp=None, format='-MM-dd HH:mm:ss'): @since(1.5) def from_utc_timestamp(timestamp, tz): """ -Given a
spark git commit: [SPARK-23715][SQL][DOC] improve document for from/to_utc_timestamp
Repository: spark Updated Branches: refs/heads/master f309b28bd -> ff876137f [SPARK-23715][SQL][DOC] improve document for from/to_utc_timestamp ## What changes were proposed in this pull request? We have an agreement that the behavior of `from/to_utc_timestamp` is corrected, although the function itself doesn't make much sense in Spark: https://issues.apache.org/jira/browse/SPARK-23715 This PR improves the document. ## How was this patch tested? N/A Closes #22543 from cloud-fan/doc. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff876137 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff876137 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff876137 Branch: refs/heads/master Commit: ff876137faba1802b66ecd483ba15f6ccd83ffc5 Parents: f309b28 Author: Wenchen Fan Authored: Thu Sep 27 15:02:20 2018 +0800 Committer: Wenchen Fan Committed: Thu Sep 27 15:02:20 2018 +0800 -- R/pkg/R/functions.R | 26 + python/pyspark/sql/functions.py | 30 .../expressions/datetimeExpressions.scala | 30 3 files changed, 68 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ff876137/R/pkg/R/functions.R -- diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 6425c9d..2cb4cb8 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2204,9 +2204,16 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType") }) #' @details -#' \code{from_utc_timestamp}: Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a -#' time in UTC, and renders that time as a timestamp in the given time zone. For example, 'GMT+1' -#' would yield '2017-07-14 03:40:00.0'. +#' \code{from_utc_timestamp}: This is a common function for databases supporting TIMESTAMP WITHOUT +#' TIMEZONE. This function takes a timestamp which is timezone-agnostic, and interprets it as a +#' timestamp in UTC, and renders that timestamp as a timestamp in the given time zone. +#' However, timestamp in Spark represents number of microseconds from the Unix epoch, which is not +#' timezone-agnostic. So in Spark this function just shift the timestamp value from UTC timezone to +#' the given timezone. +#' This function may return confusing result if the input is a string with timezone, e.g. +#' (\code{2018-03-13T06:18:23+00:00}). The reason is that, Spark firstly cast the string to +#' timestamp according to the timezone in the string, and finally display the result by converting +#' the timestamp to string according to the session local timezone. #' #' @rdname column_datetime_diff_functions #' @@ -2262,9 +2269,16 @@ setMethod("next_day", signature(y = "Column", x = "character"), }) #' @details -#' \code{to_utc_timestamp}: Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a -#' time in the given time zone, and renders that time as a timestamp in UTC. For example, 'GMT+1' -#' would yield '2017-07-14 01:40:00.0'. +#' \code{to_utc_timestamp}: This is a common function for databases supporting TIMESTAMP WITHOUT +#' TIMEZONE. This function takes a timestamp which is timezone-agnostic, and interprets it as a +#' timestamp in the given timezone, and renders that timestamp as a timestamp in UTC. +#' However, timestamp in Spark represents number of microseconds from the Unix epoch, which is not +#' timezone-agnostic. So in Spark this function just shift the timestamp value from the given +#' timezone to UTC timezone. +#' This function may return confusing result if the input is a string with timezone, e.g. +#' (\code{2018-03-13T06:18:23+00:00}). The reason is that, Spark firstly cast the string to +#' timestamp according to the timezone in the string, and finally display the result by converting +#' the timestamp to string according to the session local timezone. #' #' @rdname column_datetime_diff_functions #' @aliases to_utc_timestamp to_utc_timestamp,Column,character-method http://git-wip-us.apache.org/repos/asf/spark/blob/ff876137/python/pyspark/sql/functions.py -- diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 1c3d972..e5bc1ea 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1283,9 +1283,18 @@ def unix_timestamp(timestamp=None, format='-MM-dd HH:mm:ss'): @since(1.5) def from_utc_timestamp(timestamp, tz): """ -Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a time in UTC, and renders -that time as a
spark git commit: [SPARK-25485][SQL][TEST] Refactor UnsafeProjectionBenchmark to use main method
Repository: spark Updated Branches: refs/heads/master 8b727994e -> f309b28bd [SPARK-25485][SQL][TEST] Refactor UnsafeProjectionBenchmark to use main method ## What changes were proposed in this pull request? Refactor `UnsafeProjectionBenchmark` to use main method. Generate benchmark result: ``` SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "catalyst/test:runMain org.apache.spark.sql.UnsafeProjectionBenchmark" ``` ## How was this patch tested? manual test Closes #22493 from yucai/SPARK-25485. Lead-authored-by: yucai Co-authored-by: Yucai Yu Co-authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f309b28b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f309b28b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f309b28b Branch: refs/heads/master Commit: f309b28bd9271719ca36fcf334f016ed6165a79b Parents: 8b72799 Author: yucai Authored: Wed Sep 26 23:27:45 2018 -0700 Committer: Dongjoon Hyun Committed: Wed Sep 26 23:27:45 2018 -0700 -- .../UnsafeProjectionBenchmark-results.txt | 14 ++ .../spark/sql/UnsafeProjectionBenchmark.scala | 172 +-- 2 files changed, 98 insertions(+), 88 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f309b28b/sql/catalyst/benchmarks/UnsafeProjectionBenchmark-results.txt -- diff --git a/sql/catalyst/benchmarks/UnsafeProjectionBenchmark-results.txt b/sql/catalyst/benchmarks/UnsafeProjectionBenchmark-results.txt new file mode 100644 index 000..43156dc --- /dev/null +++ b/sql/catalyst/benchmarks/UnsafeProjectionBenchmark-results.txt @@ -0,0 +1,14 @@ + +unsafe projection + + +OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +unsafe projection: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + +single long 2867 / 2868 93.6 10.7 1.0X +single nullable long 3915 / 3949 68.6 14.6 0.7X +7 primitive types 8166 / 8167 32.9 30.4 0.4X +7 nullable primitive types 12767 / 12767 21.0 47.6 0.2X + + http://git-wip-us.apache.org/repos/asf/spark/blob/f309b28b/sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala index faff681..cbe723f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/UnsafeProjectionBenchmark.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import org.apache.spark.benchmark.Benchmark +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.UnsafeProjection @@ -25,8 +25,15 @@ import org.apache.spark.sql.types._ /** * Benchmark `UnsafeProjection` for fixed-length/primitive-type fields. + * {{{ + * To run this benchmark: + * 1. without sbt: bin/spark-submit --class + * 2. build/sbt "sql/test:runMain " + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain " + * Results will be written to "benchmarks/UnsafeProjectionBenchmark-results.txt". + * }}} */ -object UnsafeProjectionBenchmark { +object UnsafeProjectionBenchmark extends BenchmarkBase { def generateRows(schema: StructType, numRows: Int): Array[InternalRow] = { val generator = RandomDataGenerator.forType(schema, nullable = false).get @@ -34,103 +41,92 @@ object UnsafeProjectionBenchmark { (1 to numRows).map(_ => encoder.toRow(generator().asInstanceOf[Row]).copy()).toArray } - def main(args: Array[String]) { -val iters = 1024 * 16 -val numRows = 1024 * 16 - -val benchmark = new Benchmark("unsafe projection", iters * numRows.toLong) - - -val schema1 = new StructType().add("l", LongType, false) -val attrs1 = schema1.toAttributes -val rows1 = generateRows(schema1, numRows) -val projection1 =