[spark] branch master updated (e9ccf4a -> e9af457)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from e9ccf4a [SPARK-35640][SQL] Refactor Parquet vectorized reader to remove duplicated code paths add e9af457 [SPARK-35718][SQL] Support casting of Date to timestamp without time zone type No new revisions were added by this update. Summary of changes: .../spark/sql/catalyst/expressions/Cast.scala | 24 +++--- .../spark/sql/catalyst/expressions/CastSuite.scala | 9 2 files changed, 30 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (463daab -> e9ccf4a)
This is an automated email from the ASF dual-hosted git repository. dbtsai pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 463daab [SPARK-34512][BUILD][SQL] Upgrade built-in Hive to 2.3.9 add e9ccf4a [SPARK-35640][SQL] Refactor Parquet vectorized reader to remove duplicated code paths No new revisions were added by this update. Summary of changes: .../datasources/parquet/ParquetVectorUpdater.java | 86 ++ .../parquet/ParquetVectorUpdaterFactory.java | 1005 .../parquet/VectorizedColumnReader.java| 597 +--- .../parquet/VectorizedRleValuesReader.java | 433 + .../datasources/parquet/ParquetIOSuite.scala | 22 + 5 files changed, 1144 insertions(+), 999 deletions(-) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdater.java create mode 100644 sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (912d60b -> 463daab)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 912d60b [SPARK-35709][DOCS] Remove the reference to third party Nomad integration project add 463daab [SPARK-34512][BUILD][SQL] Upgrade built-in Hive to 2.3.9 No new revisions were added by this update. Summary of changes: dev/deps/spark-deps-hadoop-2.7-hive-2.3| 26 +- dev/deps/spark-deps-hadoop-3.2-hive-2.3| 26 +- docs/building-spark.md | 4 +-- docs/sql-data-sources-hive-tables.md | 8 +++--- docs/sql-migration-guide.md| 2 +- pom.xml| 4 +-- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 2 +- .../org/apache/spark/sql/hive/HiveUtils.scala | 2 +- .../org/apache/spark/sql/hive/client/package.scala | 2 +- .../hive/HiveExternalCatalogVersionsSuite.scala| 2 +- .../hive/execution/HiveSerDeReadWriteSuite.scala | 31 ++ 11 files changed, 70 insertions(+), 39 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (cf07036 -> 912d60b)
This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from cf07036 [SPARK-35593][K8S][CORE] Support shuffle data recovery on the reused PVCs add 912d60b [SPARK-35709][DOCS] Remove the reference to third party Nomad integration project No new revisions were added by this update. Summary of changes: docs/cluster-overview.md | 3 --- 1 file changed, 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (a97885b -> cf07036)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from a97885b [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data add cf07036 [SPARK-35593][K8S][CORE] Support shuffle data recovery on the reused PVCs No new revisions were added by this update. Summary of changes: .../scala/org/apache/spark/MapOutputTracker.scala | 22 ++- .../shuffle/KubernetesLocalDiskShuffleDataIO.scala | 24 +-- ...ernetesLocalDiskShuffleExecutorComponents.scala | 102 ++ .../KubernetesLocalDiskShuffleDataIOSuite.scala| 219 + 4 files changed, 353 insertions(+), 14 deletions(-) copy core/src/main/scala/org/apache/spark/shuffle/ShuffleWriter.scala => resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIO.scala (53%) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleExecutorComponents.scala create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/shuffle/KubernetesLocalDiskShuffleDataIOSuite.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a97885b [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data a97885b is described below commit a97885bb2c81563a18c99df233fb9e99ff368c9c Author: Ye Zhou AuthorDate: Thu Jun 10 16:57:46 2021 -0500 [SPARK-33350][SHUFFLE] Add support to DiskBlockManager to create merge directory and to get the local shuffle merged data ### What changes were proposed in this pull request? This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle. ### Summary of changes: Executor will create the merge directories under the application temp directory provided by YARN. The access control of the folder will be set to 770, where Shuffle Service can create merged shuffle files and write merge shuffle data in to those files. Serve the merged shuffle blocks fetch request, read the merged shuffle blocks. ### Why are the changes needed? Refer to the SPIP in SPARK-30602. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Added unit tests. The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602. We have already verified the functionality and the improved performance as documented in the SPIP doc. Lead-authored-by: Min Shen mshenlinkedin.com Co-authored-by: Chandni Singh chsinghlinkedin.com Co-authored-by: Ye Zhou yezhoulinkedin.com Closes #32007 from zhouyejoe/SPARK-33350. Lead-authored-by: Ye Zhou Co-authored-by: Chandni Singh Co-authored-by: Min Shen Signed-off-by: Mridul Muralidharan gmail.com> --- .../network/shuffle/RemoteBlockPushResolver.java | 8 +- .../scala/org/apache/spark/MapOutputTracker.scala | 2 +- .../spark/shuffle/IndexShuffleBlockResolver.scala | 73 +- .../spark/shuffle/ShuffleBlockResolver.scala | 13 ++- .../scala/org/apache/spark/storage/BlockId.scala | 37 +++ .../org/apache/spark/storage/BlockManager.scala| 26 - .../apache/spark/storage/DiskBlockManager.scala| 112 + .../main/scala/org/apache/spark/util/Utils.scala | 28 +- .../shuffle/HostLocalShuffleReadingSuite.scala | 9 ++ .../sort/IndexShuffleBlockResolverSuite.scala | 93 - .../org/apache/spark/storage/BlockIdSuite.scala| 36 +++ .../spark/storage/DiskBlockManagerSuite.scala | 39 +++ .../scala/org/apache/spark/util/UtilsSuite.scala | 12 ++- 13 files changed, 470 insertions(+), 18 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 1ac33cd..47d2547 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -75,6 +75,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { private static final Logger logger = LoggerFactory.getLogger(RemoteBlockPushResolver.class); @VisibleForTesting static final String MERGE_MANAGER_DIR = "merge_manager"; + public static final String MERGED_SHUFFLE_FILE_NAME_PREFIX = "shuffleMerged"; private final ConcurrentMap appsPathInfo; private final ConcurrentMap> partitions; @@ -211,7 +212,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { /** * The logic here is consistent with - * org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile + * @see [[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile( + * org.apache.spark.storage.BlockId, scala.Option)]] */ private File getFile(String appId, String filename) { // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart @@ -431,8 +433,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { executorInfo.subDirsPerLocalDir)); } private static String generateFileName(AppShuffleId appShuffleId, int reduceId) { -return String.format("mergedShuffle_%s_%d_%d", appShuffleId.appId, appShuffleId.shuffleId, - reduceId); +return String.format("%s_%s_%d_%d", MERGED_SHUFFLE_FILE_NAME_PREFIX, appShuffleId.appId, + appShuffleId.shuffleId, reduceId); } /** diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 9f2228b..003b10f 100644 --- a/core/src/main/s
[spark] branch master updated (b4b78ce -> bc1edba)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from b4b78ce [SPARK-32975][K8S][FOLLOWUP] Avoid None.get exception add bc1edba [SPARK-35692][K8S] Use AtomicInteger for executor id generating No new revisions were added by this update. Summary of changes: .../apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala | 7 +++ .../spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala | 6 ++ 2 files changed, 9 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.1 updated: [SPARK-32975][K8S][FOLLOWUP] Avoid None.get exception
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 0f3a251 [SPARK-32975][K8S][FOLLOWUP] Avoid None.get exception 0f3a251 is described below commit 0f3a251af0795bfa4af75ce1efa6a845a31362fa Author: Kent Yao AuthorDate: Thu Jun 10 13:39:39 2021 -0700 [SPARK-32975][K8S][FOLLOWUP] Avoid None.get exception ### What changes were proposed in this pull request? A follow-up for SPARK-32975 to avoid unexpected the `None.get` exception Run SparkPi with docker desktop, as podName is an option, we will got ```logtalk 21/06/09 01:09:12 ERROR Utils: Uncaught exception in thread main java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:529) at scala.None$.get(Option.scala:527) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1(ExecutorPodsAllocator.scala:110) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1417) at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.start(ExecutorPodsAllocator.scala:111) at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.start(KubernetesClusterSchedulerBackend.scala:99) at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:220) at org.apache.spark.SparkContext.(SparkContext.scala:581) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2686) at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:948) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:942) at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:30) at org.apache.spark.examples.SparkPi.main(SparkPi.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ``` ### Why are the changes needed? fix a regression ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Manual. Closes #32830 from yaooqinn/SPARK-32975. Authored-by: Kent Yao Signed-off-by: Dongjoon Hyun (cherry picked from commit b4b78ce26567ce7ab83d47ce3b6af87c866bcacb) Signed-off-by: Dongjoon Hyun --- .../scheduler/cluster/k8s/ExecutorPodsAllocator.scala| 16 +--- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 358058e..5429e36 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -102,13 +102,15 @@ private[spark] class ExecutorPodsAllocator( @volatile private var deletedExecutorIds = Set.empty[Long] def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { -// Wait until the driver pod is ready before starting executors, as the headless service won't -// be resolvable by DNS until the driver pod is ready. -Utils.tryLogNonFatalError { - kubernetesClient -.pods() -.withName(kubernetesDriverPodName.get) -.waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS) +driverPod.foreach { pod => + // Wait until the driver pod is ready before starting executors, as the headless service won't + // be resolvable by DNS until the driver pod is ready. + Utils.tryLogNonFatalError { +kubernetesClient + .pods() + .withName(pod.getMet
[spark] branch master updated (d21ff13 -> b4b78ce)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from d21ff13 [SPARK-35716][SQL] Support casting of timestamp without time zone to date type add b4b78ce [SPARK-32975][K8S][FOLLOWUP] Avoid None.get exception No new revisions were added by this update. Summary of changes: .../scheduler/cluster/k8s/ExecutorPodsAllocator.scala| 16 +--- 1 file changed, 9 insertions(+), 7 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-35716][SQL] Support casting of timestamp without time zone to date type
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d21ff13 [SPARK-35716][SQL] Support casting of timestamp without time zone to date type d21ff13 is described below commit d21ff1318f614fc207d9cd3c485e4337faa8e878 Author: Gengliang Wang AuthorDate: Thu Jun 10 23:37:02 2021 +0300 [SPARK-35716][SQL] Support casting of timestamp without time zone to date type ### What changes were proposed in this pull request? Extend the Cast expression and support TimestampWithoutTZType in casting to DateType. ### Why are the changes needed? To conform the ANSI SQL standard which requires to support such casting. ### Does this PR introduce _any_ user-facing change? No, the new timestamp type is not released yet. ### How was this patch tested? Unit test Closes #32869 from gengliangwang/castToDate. Authored-by: Gengliang Wang Signed-off-by: Max Gekk --- .../org/apache/spark/sql/catalyst/expressions/Cast.scala | 9 + .../org/apache/spark/sql/catalyst/expressions/CastSuite.scala | 11 +-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 8de19ba..fba17d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -72,6 +72,7 @@ object Cast { case (StringType, DateType) => true case (TimestampType, DateType) => true +case (TimestampWithoutTZType, DateType) => true case (StringType, CalendarIntervalType) => true case (StringType, DayTimeIntervalType) => true @@ -534,6 +535,8 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit // throw valid precision more than seconds, according to Hive. // Timestamp.nanos is in 0 to 999,999,999, no more than a second. buildCast[Long](_, t => microsToDays(t, zoneId)) +case TimestampWithoutTZType => + buildCast[Long](_, t => microsToDays(t, ZoneOffset.UTC)) } // IntervalConverter @@ -1204,6 +1207,11 @@ abstract class CastBase extends UnaryExpression with TimeZoneAwareExpression wit (c, evPrim, evNull) => code"""$evPrim = org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToDays($c, $zid);""" + case TimestampWithoutTZType => +(c, evPrim, evNull) => + // scalastyle:off line.size.limit + code"$evPrim = org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToDays($c, java.time.ZoneOffset.UTC);" + // scalastyle:on line.size.limit case _ => (c, evPrim, evNull) => code"$evNull = true;" } @@ -1953,6 +1961,7 @@ object AnsiCast { case (StringType, DateType) => true case (TimestampType, DateType) => true +case (TimestampWithoutTZType, DateType) => true case (_: NumericType, _: NumericType) => true case (StringType, _: NumericType) => true diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala index a4e4257..51a7740 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.expressions import java.sql.{Date, Timestamp} -import java.time.{DateTimeException, Duration, LocalDateTime, Period} +import java.time.{DateTimeException, Duration, LocalDate, LocalDateTime, Period} import java.time.temporal.ChronoUnit import java.util.{Calendar, TimeZone} @@ -1256,10 +1256,17 @@ abstract class AnsiCastSuiteBase extends CastSuiteBase { test("SPARK-35711: cast timestamp without time zone to timestamp with local time zone") { specialTs.foreach { s => - val dt = LocalDateTime.parse(s.replace(" ", "T")) + val dt = LocalDateTime.parse(s) checkEvaluation(cast(dt, TimestampType), DateTimeUtils.localDateTimeToMicros(dt)) } } + + test("SPARK-35716: cast timestamp without time zone to date type") { +specialTs.foreach { s => + val dt = LocalDateTime.parse(s) + checkEvaluation(cast(dt, DateType), LocalDate.parse(s.split("T")(0))) +} + } } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b5a1503 [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage b5a1503 is described below commit b5a15035851bfba12ef1c68d10103cec42cbac0c Author: Venkata krishnan Sowrirajan AuthorDate: Thu Jun 10 13:06:15 2021 -0500 [SPARK-32920][SHUFFLE] Finalization of Shuffle push/merge with Push based shuffle and preparation step for the reduce stage ### What changes were proposed in this pull request? Summary of the changes made as part of this PR: 1. `DAGScheduler` changes to finalize a ShuffleMapStage which involves talking to all the shuffle mergers (`ExternalShuffleService`) and getting all the completed merge statuses. 2. Once the `ShuffleMapStage` finalization is complete, mark the `ShuffleMapStage` to be finalized which marks the stage as complete and subsequently letting the child stage start. 3. Also added the relevant tests to `DAGSchedulerSuite` for changes made as part of [SPARK-32919](https://issues.apache.org/jira/browse/SPARK-32919) Lead-authored-by: Min Shen mshenlinkedin.com Co-authored-by: Venkata krishnan Sowrirajan vsowrirajanlinkedin.com Co-authored-by: Chandni Singh chsinghlinkedin.com ### Why are the changes needed? Refer to [SPARK-30602](https://issues.apache.org/jira/browse/SPARK-30602) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests to DAGSchedulerSuite Closes #30691 from venkata91/SPARK-32920. Lead-authored-by: Venkata krishnan Sowrirajan Co-authored-by: Min Shen Co-authored-by: Chandni Singh Signed-off-by: Mridul Muralidharan gmail.com> --- .../main/scala/org/apache/spark/Dependency.scala | 38 ++ .../scala/org/apache/spark/MapOutputTracker.scala | 44 +- .../org/apache/spark/internal/config/package.scala | 23 +- .../org/apache/spark/scheduler/DAGScheduler.scala | 257 +--- .../apache/spark/scheduler/DAGSchedulerEvent.scala | 6 + .../org/apache/spark/scheduler/StageInfo.scala | 2 +- ...g.apache.spark.scheduler.ExternalClusterManager | 1 + .../apache/spark/scheduler/DAGSchedulerSuite.scala | 448 - 8 files changed, 747 insertions(+), 72 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index d21b9d9..0a9acf4 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -24,6 +24,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.{ShuffleHandle, ShuffleWriteProcessor} import org.apache.spark.storage.BlockManagerId +import org.apache.spark.util.Utils /** * :: DeveloperApi :: @@ -96,12 +97,31 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( shuffleId, this) + // By default, shuffle merge is enabled for ShuffleDependency if push based shuffle + // is enabled + private[this] var _shuffleMergeEnabled = +Utils.isPushBasedShuffleEnabled(rdd.sparkContext.getConf) && +// TODO: SPARK-35547: Push based shuffle is currently unsupported for Barrier stages +!rdd.isBarrier() + + private[spark] def setShuffleMergeEnabled(shuffleMergeEnabled: Boolean): Unit = { +_shuffleMergeEnabled = shuffleMergeEnabled + } + + def shuffleMergeEnabled : Boolean = _shuffleMergeEnabled + /** * Stores the location of the list of chosen external shuffle services for handling the * shuffle merge requests from mappers in this shuffle map stage. */ private[spark] var mergerLocs: Seq[BlockManagerId] = Nil + /** + * Stores the information about whether the shuffle merge is finalized for the shuffle map stage + * associated with this shuffle dependency + */ + private[this] var _shuffleMergedFinalized: Boolean = false + def setMergerLocs(mergerLocs: Seq[BlockManagerId]): Unit = { if (mergerLocs != null) { this.mergerLocs = mergerLocs @@ -110,6 +130,24 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( def getMergerLocs: Seq[BlockManagerId] = mergerLocs + private[spark] def markShuffleMergeFinalized(): Unit = { +_shuffleMergedFinalized = true + } + + /** + * Returns true if push-based shuffle is disabled for this stage or empty RDD, + * or if the shuffle merge for this stage is finalized, i.e. the shuffle merge + * results for all partitions are available. + */ + def shuffleMergeFinalized: Boolean = { +// Empty RDD
[spark] branch branch-3.0 updated: [SPARK-35296][SQL] Allow Dataset.observe to work even if CollectMetricsExec in a task handles multiple partitions
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new 4fed690 [SPARK-35296][SQL] Allow Dataset.observe to work even if CollectMetricsExec in a task handles multiple partitions 4fed690 is described below commit 4fed690e0190403200ea820887e93e0f7f0aa693 Author: Kousuke Saruta AuthorDate: Fri Jun 11 01:20:35 2021 +0800 [SPARK-35296][SQL] Allow Dataset.observe to work even if CollectMetricsExec in a task handles multiple partitions ### What changes were proposed in this pull request? This PR fixes an issue that `Dataset.observe` doesn't work if `CollectMetricsExec` in a task handles multiple partitions. If `coalesce` follows `observe` and the number of partitions shrinks after `coalesce`, `CollectMetricsExec` can handle multiple partitions in a task. ### Why are the changes needed? The current implementation of `CollectMetricsExec` doesn't consider the case it can handle multiple partitions. Because new `updater` is created for each partition even though those partitions belong to the same task, `collector.setState(updater)` raise an assertion error. This is a simple reproducible example. ``` $ bin/spark-shell --master "local[1]" scala> spark.range(1, 4, 1, 3).observe("my_event", count($"id").as("count_val")).coalesce(2).collect ``` ``` java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:208) at org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204) at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72) at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71) at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124) at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124) at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137) at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New test. Closes #32786 from sarutak/fix-collectmetricsexec. Authored-by: Kousuke Saruta Signed-off-by: Wenchen Fan (cherry picked from commit 44b695fbb06b0d89783b4838941c68543c5a5c8b) Signed-off-by: Wenchen Fan --- .../sql/execution/AggregatingAccumulator.scala | 16 +- .../spark/sql/execution/CollectMetricsExec.scala | 6 +++- .../spark/sql/util/DataFrameCallbackSuite.scala| 34 ++ 3 files changed, 48 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala index 9807b5d..68a3604 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala @@ -33,7 +33,7 @@ class AggregatingAccumulator private( bufferSchema: Seq[DataType], initialValues: Seq[Expression], updateExpressions: Seq[Expression], -@transient private val mergeExpressions: Seq[Expression], +mergeExpressions: Seq[Expression], @transient private val resultExpressions: Seq[Expression], imperatives: Array[ImperativeAggregate], typedImperatives: Array[TypedImperativeAggregate[_]], @@ -95,13 +95,14 @@ class AggregatingAccumulator private( /** * Driver side operations like `merge` and `value` are executed in the DAGScheduler thread. This - * thread does not have a SQL configuration so we attach our own here. Note that we can't (and - * shouldn't) call `merge` or `value` on an accumulator originating from an executor so we just - * return a default value here. + * thread does not have a SQL configuration so we attach our own here. */ - private[this] def withSQLConf[T](default: => T)(body: => T): T = { + private[this] def withSQLConf[T](canRunOnExecutor: Boolean, default: => T)(body: => T): T = { if (conf != null) { + // When we can reach here, we are on the driver side. SQLConf.withExistingConf(conf)(body) +} else if (canRunOnExecutor) { + body } else { default } @@ -147,7 +148,8 @@ class AggregatingAccumulator private( } } - override def merge(other: AccumulatorV2[InternalRow, InternalRow]): Unit = withSQLConf(()) {
[spark] branch branch-3.1 updated: [SPARK-35296][SQL] Allow Dataset.observe to work even if CollectMetricsExec in a task handles multiple partitions
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 0d4a2e6 [SPARK-35296][SQL] Allow Dataset.observe to work even if CollectMetricsExec in a task handles multiple partitions 0d4a2e6 is described below commit 0d4a2e67677cea4e144ab14480f321068fc00961 Author: Kousuke Saruta AuthorDate: Fri Jun 11 01:20:35 2021 +0800 [SPARK-35296][SQL] Allow Dataset.observe to work even if CollectMetricsExec in a task handles multiple partitions ### What changes were proposed in this pull request? This PR fixes an issue that `Dataset.observe` doesn't work if `CollectMetricsExec` in a task handles multiple partitions. If `coalesce` follows `observe` and the number of partitions shrinks after `coalesce`, `CollectMetricsExec` can handle multiple partitions in a task. ### Why are the changes needed? The current implementation of `CollectMetricsExec` doesn't consider the case it can handle multiple partitions. Because new `updater` is created for each partition even though those partitions belong to the same task, `collector.setState(updater)` raise an assertion error. This is a simple reproducible example. ``` $ bin/spark-shell --master "local[1]" scala> spark.range(1, 4, 1, 3).observe("my_event", count($"id").as("count_val")).coalesce(2).collect ``` ``` java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:208) at org.apache.spark.sql.execution.AggregatingAccumulator.setState(AggregatingAccumulator.scala:204) at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2(CollectMetricsExec.scala:72) at org.apache.spark.sql.execution.CollectMetricsExec.$anonfun$doExecute$2$adapted(CollectMetricsExec.scala:71) at org.apache.spark.TaskContext$$anon$1.onTaskCompletion(TaskContext.scala:125) at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124) at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124) at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137) at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New test. Closes #32786 from sarutak/fix-collectmetricsexec. Authored-by: Kousuke Saruta Signed-off-by: Wenchen Fan (cherry picked from commit 44b695fbb06b0d89783b4838941c68543c5a5c8b) Signed-off-by: Wenchen Fan --- .../sql/execution/AggregatingAccumulator.scala | 16 +- .../spark/sql/execution/CollectMetricsExec.scala | 6 +++- .../spark/sql/util/DataFrameCallbackSuite.scala| 34 ++ 3 files changed, 48 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala index 94e159c..0fa4e6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AggregatingAccumulator.scala @@ -33,7 +33,7 @@ class AggregatingAccumulator private( bufferSchema: Seq[DataType], initialValues: Seq[Expression], updateExpressions: Seq[Expression], -@transient private val mergeExpressions: Seq[Expression], +mergeExpressions: Seq[Expression], @transient private val resultExpressions: Seq[Expression], imperatives: Array[ImperativeAggregate], typedImperatives: Array[TypedImperativeAggregate[_]], @@ -95,13 +95,14 @@ class AggregatingAccumulator private( /** * Driver side operations like `merge` and `value` are executed in the DAGScheduler thread. This - * thread does not have a SQL configuration so we attach our own here. Note that we can't (and - * shouldn't) call `merge` or `value` on an accumulator originating from an executor so we just - * return a default value here. + * thread does not have a SQL configuration so we attach our own here. */ - private[this] def withSQLConf[T](default: => T)(body: => T): T = { + private[this] def withSQLConf[T](canRunOnExecutor: Boolean, default: => T)(body: => T): T = { if (conf != null) { + // When we can reach here, we are on the driver side. SQLConf.withExistingConf(conf)(body) +} else if (canRunOnExecutor) { + body } else { default } @@ -147,7 +148,8 @@ class AggregatingAccumulator private( } } - override def merge(other: AccumulatorV2[InternalRow, InternalRow]): Unit = withSQLConf(()) {
[spark] branch master updated (e2e3fe7 -> 44b695f)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from e2e3fe7 [SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path fails for Map with case classes as keys or values add 44b695f [SPARK-35296][SQL] Allow Dataset.observe to work even if CollectMetricsExec in a task handles multiple partitions No new revisions were added by this update. Summary of changes: .../sql/execution/AggregatingAccumulator.scala | 16 +- .../spark/sql/execution/CollectMetricsExec.scala | 6 +++- .../spark/sql/util/DataFrameCallbackSuite.scala| 34 ++ 3 files changed, 48 insertions(+), 8 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path fails for Map with case classes as keys or values
This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new f3ba9d9 [SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path fails for Map with case classes as keys or values f3ba9d9 is described below commit f3ba9d9408352e6b0ac6d1fc02d4d9c3a91b5952 Author: Emil Ejbyfeldt AuthorDate: Thu Jun 10 09:37:27 2021 -0700 [SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path fails for Map with case classes as keys or values ### What changes were proposed in this pull request? Use the key/value LambdaFunction to convert the elements instead of using CatalystTypeConverters.createToScalaConverter. This is how it is done in MapObjects and that correctly handles Arrays with case classes. ### Why are the changes needed? Before these changes the added test cases would fail with the following: ``` [info] - encode/decode for map with case class as value: Map(1 -> IntAndString(1,a)) (interpreted path) *** FAILED *** (64 milliseconds) [info] Encoded/Decoded data does not match input data [info] [info] in: Map(1 -> IntAndString(1,a)) [info] out: Map(1 -> [1,a]) [info] types: scala.collection.immutable.Map$Map1 [info] [info] Encoded Data: [org.apache.spark.sql.catalyst.expressions.UnsafeMapData5ecf5d9e] [info] Schema: value#823 [info] root [info] -- value: map (nullable = true) [info] |-- key: integer [info] |-- value: struct (valueContainsNull = true) [info] ||-- i: integer (nullable = false) [info] ||-- s: string (nullable = true) [info] [info] [info] fromRow Expressions: [info] catalysttoexternalmap(lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178), lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178), lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179), if (isnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))) null else newInstance(class org.apache.spark.sql.catalyst.encoders [...] [info] :- lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178) [info] :- lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178) [info] :- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179) [info] :- if (isnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))) null else newInstance(class org.apache.spark.sql.catalyst.encoders.IntAndString) [info] : :- isnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)) [info] : : +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179) [info] : :- null [info] : +- newInstance(class org.apache.spark.sql.catalyst.encoders.IntAndString) [info] : :- assertnotnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).i) [info] : : +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).i [info] : : +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179) [info] : +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).s.toString [info] :+- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).s [info] : +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179) [info] +- input[0, map>, true] (ExpressionEncoderSuite.scala:627) ``` So using a map with cases classes for keys or values and using the interpreted path would incorrect deserialize data from the catalyst representation. ### Does this PR introduce _any_ user-facing change? Yes, it fixes the bug. ### How was this patch tested? Existing and new unit tests in the ExpressionEncoderSuite Closes #32783 from eejbyfeldt/fix-interpreted-path-for-map-with-case-classes. Authored-by: Emil Ejbyfeldt Signed-off-by: Liang-Chi Hsieh (cherry picked from commit e2e3fe77823387f6d4164eede05bf077b4235c87) Signed-off-by: Liang-Chi Hsieh --- .../spark/sql/catalyst/expressions/objects/objects.scala | 14
[spark] branch branch-3.1 updated: [SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path fails for Map with case classes as keys or values
This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 70c322a [SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path fails for Map with case classes as keys or values 70c322a is described below commit 70c322ad041511ded6e531d92ffc64c11bfdc378 Author: Emil Ejbyfeldt AuthorDate: Thu Jun 10 09:37:27 2021 -0700 [SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path fails for Map with case classes as keys or values ### What changes were proposed in this pull request? Use the key/value LambdaFunction to convert the elements instead of using CatalystTypeConverters.createToScalaConverter. This is how it is done in MapObjects and that correctly handles Arrays with case classes. ### Why are the changes needed? Before these changes the added test cases would fail with the following: ``` [info] - encode/decode for map with case class as value: Map(1 -> IntAndString(1,a)) (interpreted path) *** FAILED *** (64 milliseconds) [info] Encoded/Decoded data does not match input data [info] [info] in: Map(1 -> IntAndString(1,a)) [info] out: Map(1 -> [1,a]) [info] types: scala.collection.immutable.Map$Map1 [info] [info] Encoded Data: [org.apache.spark.sql.catalyst.expressions.UnsafeMapData5ecf5d9e] [info] Schema: value#823 [info] root [info] -- value: map (nullable = true) [info] |-- key: integer [info] |-- value: struct (valueContainsNull = true) [info] ||-- i: integer (nullable = false) [info] ||-- s: string (nullable = true) [info] [info] [info] fromRow Expressions: [info] catalysttoexternalmap(lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178), lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178), lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179), if (isnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))) null else newInstance(class org.apache.spark.sql.catalyst.encoders [...] [info] :- lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178) [info] :- lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178) [info] :- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179) [info] :- if (isnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))) null else newInstance(class org.apache.spark.sql.catalyst.encoders.IntAndString) [info] : :- isnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)) [info] : : +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179) [info] : :- null [info] : +- newInstance(class org.apache.spark.sql.catalyst.encoders.IntAndString) [info] : :- assertnotnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).i) [info] : : +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).i [info] : : +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179) [info] : +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).s.toString [info] :+- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).s [info] : +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179) [info] +- input[0, map>, true] (ExpressionEncoderSuite.scala:627) ``` So using a map with cases classes for keys or values and using the interpreted path would incorrect deserialize data from the catalyst representation. ### Does this PR introduce _any_ user-facing change? Yes, it fixes the bug. ### How was this patch tested? Existing and new unit tests in the ExpressionEncoderSuite Closes #32783 from eejbyfeldt/fix-interpreted-path-for-map-with-case-classes. Authored-by: Emil Ejbyfeldt Signed-off-by: Liang-Chi Hsieh (cherry picked from commit e2e3fe77823387f6d4164eede05bf077b4235c87) Signed-off-by: Liang-Chi Hsieh --- .../spark/sql/catalyst/expressions/objects/objects.scala | 14
[spark] branch master updated (4180692 -> e2e3fe7)
This is an automated email from the ASF dual-hosted git repository. viirya pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4180692 [SPARK-35711][SQL] Support casting of timestamp without time zone to timestamp type add e2e3fe7 [SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path fails for Map with case classes as keys or values No new revisions were added by this update. Summary of changes: .../spark/sql/catalyst/expressions/objects/objects.scala | 14 ++ .../sql/catalyst/encoders/ExpressionEncoderSuite.scala | 5 + 2 files changed, 11 insertions(+), 8 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (88f1d82 -> 4180692)
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 88f1d82 [SPARK-34524][SQL][FOLLOWUP] Remove unused checkAlterTablePartition in CheckAnalysis.scala add 4180692 [SPARK-35711][SQL] Support casting of timestamp without time zone to timestamp type No new revisions were added by this update. Summary of changes: .../spark/sql/catalyst/expressions/Cast.scala | 5 .../spark/sql/catalyst/expressions/CastSuite.scala | 32 ++ 2 files changed, 26 insertions(+), 11 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (5280f02 -> 88f1d82)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 5280f02 [SPARK-35673][SQL] Fix user-defined hint and unrecognized hint in subquery add 88f1d82 [SPARK-34524][SQL][FOLLOWUP] Remove unused checkAlterTablePartition in CheckAnalysis.scala No new revisions were added by this update. Summary of changes: .../sql/catalyst/analysis/CheckAnalysis.scala | 22 +- 1 file changed, 1 insertion(+), 21 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.0 updated: [SPARK-35673][SQL] Fix user-defined hint and unrecognized hint in subquery
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new c8d0bd0 [SPARK-35673][SQL] Fix user-defined hint and unrecognized hint in subquery c8d0bd0 is described below commit c8d0bd07fc34861a802f176c35a3beb8f8b8d375 Author: Fu Chen AuthorDate: Thu Jun 10 15:32:10 2021 +0800 [SPARK-35673][SQL] Fix user-defined hint and unrecognized hint in subquery Use `UnresolvedHint.resolved = child.resolved` instead `UnresolvedHint.resolved = false`, then the plan contains `UnresolvedHint` child can be optimized by rule in batch `Resolution`. For instance, before this pr, the following plan can't be optimized by `ResolveReferences`. ``` !'Project [*] +- SubqueryAlias __auto_generated_subquery_name +- UnresolvedHint use_hash +- Project [42 AS 42#10] +- OneRowRelation ``` fix hint in subquery bug No. New test. Closes #32841 from cfmcgrady/SPARK-35673. Authored-by: Fu Chen Signed-off-by: Wenchen Fan (cherry picked from commit 5280f02747eed9849e4a64562d38aee11e21616f) Signed-off-by: Wenchen Fan --- .../sql/catalyst/analysis/CheckAnalysis.scala | 3 +++ .../spark/sql/catalyst/plans/logical/hints.scala | 5 - .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 16 + .../spark/sql/SparkSessionExtensionSuite.scala | 26 ++ 4 files changed, 49 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 2887967..fe12dd4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -105,6 +105,9 @@ trait CheckAnalysis extends PredicateHelper { case u: UnresolvedRelation => u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}") + case u: UnresolvedHint => +u.failAnalysis(s"Hint not found: ${u.name}") + case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) => failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala index a325b61..1202d7d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala @@ -30,7 +30,10 @@ import org.apache.spark.util.Utils case class UnresolvedHint(name: String, parameters: Seq[Any], child: LogicalPlan) extends UnaryNode { - override lazy val resolved: Boolean = false + // we need it to be resolved so that the analyzer can continue to analyze the rest of the query + // plan. + override lazy val resolved: Boolean = child.resolved + override def output: Seq[Attribute] = child.output } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index afc9780..348c282 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -722,4 +722,20 @@ class AnalysisErrorSuite extends AnalysisTest { assertAnalysisError(plan, s"Correlated column is not allowed in predicate ($msg)" :: Nil) } } + + test("SPARK-35673: fail if the plan still contains UnresolvedHint after analysis") { +val hintName = "some_random_hint_that_does_not_exist" +val plan = UnresolvedHint(hintName, Seq.empty, + Project(Alias(Literal(1), "x")() :: Nil, OneRowRelation()) +) +assert(plan.resolved) + +val error = intercept[AnalysisException] { + SimpleAnalyzer.checkAnalysis(plan) +} +assert(error.message.contains(s"Hint not found: ${hintName}")) + +// UnresolvedHint be removed by batch `Remove Unresolved Hints` +assertAnalysisSuccess(plan, true) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index e5e8bc6..b30d579 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala @@ -349,6 +349,32 @@ class SparkSessionExtensionSuite extends SparkFunSuite { stop(sess
[spark] branch branch-3.1 updated: [SPARK-35673][SQL] Fix user-defined hint and unrecognized hint in subquery
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new e0b074a [SPARK-35673][SQL] Fix user-defined hint and unrecognized hint in subquery e0b074a is described below commit e0b074a915ed39f0e51a21936787943dea8ae39f Author: Fu Chen AuthorDate: Thu Jun 10 15:32:10 2021 +0800 [SPARK-35673][SQL] Fix user-defined hint and unrecognized hint in subquery Use `UnresolvedHint.resolved = child.resolved` instead `UnresolvedHint.resolved = false`, then the plan contains `UnresolvedHint` child can be optimized by rule in batch `Resolution`. For instance, before this pr, the following plan can't be optimized by `ResolveReferences`. ``` !'Project [*] +- SubqueryAlias __auto_generated_subquery_name +- UnresolvedHint use_hash +- Project [42 AS 42#10] +- OneRowRelation ``` fix hint in subquery bug No. New test. Closes #32841 from cfmcgrady/SPARK-35673. Authored-by: Fu Chen Signed-off-by: Wenchen Fan (cherry picked from commit 5280f02747eed9849e4a64562d38aee11e21616f) Signed-off-by: Wenchen Fan --- .../sql/catalyst/analysis/CheckAnalysis.scala | 3 +++ .../spark/sql/catalyst/plans/logical/hints.scala | 5 - .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 16 + .../spark/sql/SparkSessionExtensionSuite.scala | 26 ++ 4 files changed, 49 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 3dfe7f4..84de9b9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -112,6 +112,9 @@ trait CheckAnalysis extends PredicateHelper { case u: UnresolvedRelation => u.failAnalysis(s"Table or view not found: ${u.multipartIdentifier.quoted}") + case u: UnresolvedHint => +u.failAnalysis(s"Hint not found: ${u.name}") + case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _) => failAnalysis(s"Table not found: ${u.multipartIdentifier.quoted}") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala index 4b5e278..1b72d21 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala @@ -29,7 +29,10 @@ import org.apache.spark.sql.catalyst.expressions.Attribute case class UnresolvedHint(name: String, parameters: Seq[Any], child: LogicalPlan) extends UnaryNode { - override lazy val resolved: Boolean = false + // we need it to be resolved so that the analyzer can continue to analyze the rest of the query + // plan. + override lazy val resolved: Boolean = child.resolved + override def output: Seq[Attribute] = child.output } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 20ba9c5..d14c221 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -724,4 +724,20 @@ class AnalysisErrorSuite extends AnalysisTest { assertAnalysisError(plan, s"Correlated column is not allowed in predicate ($msg)" :: Nil) } } + + test("SPARK-35673: fail if the plan still contains UnresolvedHint after analysis") { +val hintName = "some_random_hint_that_does_not_exist" +val plan = UnresolvedHint(hintName, Seq.empty, + Project(Alias(Literal(1), "x")() :: Nil, OneRowRelation()) +) +assert(plan.resolved) + +val error = intercept[AnalysisException] { + SimpleAnalyzer.checkAnalysis(plan) +} +assert(error.message.contains(s"Hint not found: ${hintName}")) + +// UnresolvedHint be removed by batch `Remove Unresolved Hints` +assertAnalysisSuccess(plan, true) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 35d2513..03514d85 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala @@ -355,6 +355,32 @@ class SparkSessionExtensionSuite extends Spar
[spark] branch master updated (cadd3a0 -> 5280f02)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from cadd3a0 [SPARK-35474] Enable disallow_untyped_defs mypy check for pyspark.pandas.indexing add 5280f02 [SPARK-35673][SQL] Fix user-defined hint and unrecognized hint in subquery No new revisions were added by this update. Summary of changes: .../sql/catalyst/analysis/CheckAnalysis.scala | 3 +++ .../spark/sql/catalyst/plans/logical/hints.scala | 5 - .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 16 + .../spark/sql/SparkSessionExtensionSuite.scala | 26 ++ 4 files changed, 49 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org