[spark] branch master updated (f7be024 -> ce1f97f)
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from f7be024 [SPARK-37480][K8S][DOC] Sync Kubernetes configuration to latest in running-on-k8s.md add ce1f97f [SPARK-37326][SQL] Support TimestampNTZ in CSV data source No new revisions were added by this update. Summary of changes: docs/sql-data-sources-csv.md | 12 +- .../spark/sql/catalyst/csv/CSVInferSchema.scala| 24 +++ .../apache/spark/sql/catalyst/csv/CSVOptions.scala | 4 + .../sql/catalyst/csv/UnivocityGenerator.scala | 2 +- .../spark/sql/catalyst/csv/UnivocityParser.scala | 4 +- .../spark/sql/catalyst/util/DateTimeUtils.scala| 32 ++- .../sql/catalyst/util/TimestampFormatter.scala | 36 +++- .../spark/sql/errors/QueryExecutionErrors.scala| 8 +- .../sql/catalyst/util/DateTimeUtilsSuite.scala | 12 ++ .../org/apache/spark/sql/CsvFunctionsSuite.scala | 11 ++ .../sql/execution/datasources/csv/CSVSuite.scala | 216 - 11 files changed, 331 insertions(+), 30 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-37480][K8S][DOC] Sync Kubernetes configuration to latest in running-on-k8s.md
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f7be024 [SPARK-37480][K8S][DOC] Sync Kubernetes configuration to latest in running-on-k8s.md f7be024 is described below commit f7be0248f84a67d4fa839dc2d6e496c5a7d96830 Author: Yikun Jiang AuthorDate: Tue Nov 30 22:49:32 2021 -0800 [SPARK-37480][K8S][DOC] Sync Kubernetes configuration to latest in running-on-k8s.md ### What changes were proposed in this pull request? Sync Kubernetes configurations to latest in doc ### Why are the changes needed? Configurations in docs/running-on-kubernetes.md are not uptodate ### Does this PR introduce _any_ user-facing change? No, docs only ### How was this patch tested? CI passed Closes #34734 from Yikun/SPARK-37480. Authored-by: Yikun Jiang Signed-off-by: Dongjoon Hyun --- docs/running-on-kubernetes.md | 175 -- 1 file changed, 170 insertions(+), 5 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index d32861b..58d6dda 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -592,6 +592,7 @@ See the [configuration page](configuration.html) for information on Spark config IfNotPresent Container image pull policy used when pulling images within Kubernetes. +Valid values are Always, Never, and IfNotPresent. 2.3.0 @@ -780,6 +781,15 @@ See the [configuration page](configuration.html) for information on Spark config 2.3.0 + spark.kubernetes.authenticate.executor.serviceAccountName + (value of spark.kubernetes.authenticate.driver.serviceAccountName) + +Service account that is used when running the executor pod. +If this parameter is not setup, the fallback logic will use the driver's service account. + + 3.1.0 + + spark.kubernetes.authenticate.caCertFile (none) @@ -925,6 +935,14 @@ See the [configuration page](configuration.html) for information on Spark config 2.3.0 + spark.kubernetes.executor.apiPollingInterval + 30s + +Interval between polls against the Kubernetes API server to inspect the state of executors. + + 2.4.0 + + spark.kubernetes.driver.request.cores (none) @@ -1232,7 +1250,7 @@ See the [configuration page](configuration.html) for information on Spark config spark.kubernetes.executor.checkAllContainers - false + false Specify whether executor pods should be check all containers (including sidecars) or only the executor container when determining the pod status. @@ -1240,7 +1258,7 @@ See the [configuration page](configuration.html) for information on Spark config spark.kubernetes.submission.connectionTimeout - 1 + 1 Connection timeout in milliseconds for the kubernetes client to use for starting the driver. @@ -1248,7 +1266,7 @@ See the [configuration page](configuration.html) for information on Spark config spark.kubernetes.submission.requestTimeout - 1 + 1 Request timeout in milliseconds for the kubernetes client to use for starting the driver. @@ -1256,7 +1274,7 @@ See the [configuration page](configuration.html) for information on Spark config spark.kubernetes.driver.connectionTimeout - 1 + 1 Connection timeout in milliseconds for the kubernetes client in driver to use when requesting executors. @@ -1264,7 +1282,7 @@ See the [configuration page](configuration.html) for information on Spark config spark.kubernetes.driver.requestTimeout - 1 + 1 Request timeout in milliseconds for the kubernetes client in driver to use when requesting executors. @@ -1279,6 +1297,14 @@ See the [configuration page](configuration.html) for information on Spark config 3.0.0 + spark.kubernetes.dynamicAllocation.deleteGracePeriod + 5s + +How long to wait for executors to shut down gracefully before a forceful kill. + + 3.0.0 + + spark.kubernetes.file.upload.path (none) @@ -1322,6 +1348,145 @@ See the [configuration page](configuration.html) for information on Spark config 3.3.0 + + spark.kubernetes.configMap.maxSize + 1572864 + +Max size limit for a config map. +This is configurable as per https://etcd.io/docs/latest/dev-guide/limit/";>limit on k8s server end. + + 3.1.0 + + + spark.kubernetes.executor.missingPodDetectDelta + 30s + +When a registered executor's POD is missing from the Kubernetes API server's polled +list of PODs then this delta time is taken as the accepted time difference between the +registration time and the time of the polling. After this time the POD is considered +missing from the cluster and the executor will be re
[spark] branch master updated (d61c2f4 -> e7fa289)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from d61c2f4 [SPARK-37490][SQL] Show extra hint if analyzer fails due to ANSI type coercion add e7fa289 [SPARK-37376][SQL] Introduce a new DataSource V2 interface HasPartitionKey No new revisions were added by this update. Summary of changes: .../spark/sql/connector/read/HasPartitionKey.java | 52 ++ .../sql/connector/catalog/InMemoryTable.scala | 22 + .../spark/sql/connector/DataSourceV2SQLSuite.scala | 30 + 3 files changed, 96 insertions(+), 8 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/HasPartitionKey.java - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-37490][SQL] Show extra hint if analyzer fails due to ANSI type coercion
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d61c2f4 [SPARK-37490][SQL] Show extra hint if analyzer fails due to ANSI type coercion d61c2f4 is described below commit d61c2f45c3c1fa90aef7f7aff0d9f292edfd3083 Author: Gengliang Wang AuthorDate: Wed Dec 1 12:45:04 2021 +0800 [SPARK-37490][SQL] Show extra hint if analyzer fails due to ANSI type coercion ### What changes were proposed in this pull request? Show extra hint in the error message if analysis failed only with ANSI type coercion: ``` To fix the error, you might need to add explicit type casts. If necessary set spark.sql.ansi.enabled to false to bypass this error. ``` ### Why are the changes needed? Improve error message ### Does this PR introduce _any_ user-facing change? Yes, Spark will show extra hint if analyzer fails due to ANSI type coercion ### How was this patch tested? Unit tests Closes #34747 from gengliangwang/improveCoercionMsg. Authored-by: Gengliang Wang Signed-off-by: Gengliang Wang --- .../sql/catalyst/analysis/AnsiTypeCoercion.scala | 7 +- .../sql/catalyst/analysis/CheckAnalysis.scala | 111 + .../spark/sql/catalyst/analysis/TypeCoercion.scala | 4 +- .../sql/catalyst/rules/RuleIdCollection.scala | 1 + .../resources/sql-tests/results/ansi/date.sql.out | 12 ++- .../sql-tests/results/ansi/interval.sql.out| 6 +- .../sql-tests/results/postgreSQL/union.sql.out | 1 + 7 files changed, 113 insertions(+), 29 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala index debc13b..267c2cc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercion.scala @@ -75,7 +75,7 @@ import org.apache.spark.sql.types._ object AnsiTypeCoercion extends TypeCoercionBase { override def typeCoercionRules: List[Rule[LogicalPlan]] = WidenSetOperationTypes :: -CombinedTypeCoercionRule( +new AnsiCombinedTypeCoercionRule( InConversion :: PromoteStringLiterals :: DecimalPrecision :: @@ -304,4 +304,9 @@ object AnsiTypeCoercion extends TypeCoercionBase { s.copy(left = newLeft, right = newRight) } } + + // This is for generating a new rule id, so that we can run both default and Ansi + // type coercion rules against one logical plan. + class AnsiCombinedTypeCoercionRule(rules: Seq[TypeCoercionRule]) extends +CombinedTypeCoercionRule(rules) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 5bf37a2..491d525 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, StringUtils, TypeUtils} import org.apache.spark.sql.connector.catalog.{LookupCatalog, SupportsPartitionManagement} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} @@ -47,6 +48,8 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { */ val extendedCheckRules: Seq[LogicalPlan => Unit] = Nil + val DATA_TYPE_MISMATCH_ERROR = TreeNodeTag[Boolean]("dataTypeMismatchError") + protected def failAnalysis(msg: String): Nothing = { throw new AnalysisException(msg) } @@ -165,14 +168,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { } } -val exprs = operator match { - // `groupingExpressions` may rely on `aggregateExpressions`, due to the GROUP BY alias - // feature. We should check errors in `aggregateExpressions` first. - case a: Aggregate => a.aggregateExpressions ++ a.groupingExpressions - case _ => operator.expressions -} - -exprs.foreach(_.foreachUp { +getAllExpressions(operator).foreach(_.foreachUp { case a: Attribute if !a.resolved => val missingCol = a.sql val candidates = operator.inputSet.toSeq.map(_.qualifiedNa
[spark] branch branch-3.1 updated: [SPARK-37497][K8S] Promote `ExecutorPods[PollingSnapshot|WatchSnapshot]Source` to DeveloperApi
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new 75cac1f [SPARK-37497][K8S] Promote `ExecutorPods[PollingSnapshot|WatchSnapshot]Source` to DeveloperApi 75cac1f is described below commit 75cac1fe0a46dbdf2ad5b741a3a49c9ab618cdce Author: Dongjoon Hyun AuthorDate: Tue Nov 30 18:41:18 2021 -0800 [SPARK-37497][K8S] Promote `ExecutorPods[PollingSnapshot|WatchSnapshot]Source` to DeveloperApi ### What changes were proposed in this pull request? This PR aims to promote `ExecutorPodsWatchSnapshotSource` and `ExecutorPodsPollingSnapshotSource` as **stable** `DeveloperApi` in order to maintain it officially in a backward compatible way at Apache Spark 3.3.0. ### Why are the changes needed? - Since SPARK-24248 at Apache Spark 2.4.0, `ExecutorPodsWatchSnapshotSource` and `ExecutorPodsPollingSnapshotSource` have been used to monitor executor pods without any interface changes for over 3 years. - Apache Spark 3.1.1 makes `Kubernetes` module GA and provides an extensible external cluster manager framework. New `ExternalClusterManager` for K8s environment need to depend on this to monitor pods. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual review. Closes #34751 from dongjoon-hyun/SPARK-37497. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun (cherry picked from commit 2b044962cd6eff5a3a76f2808ee93b40bdf931df) Signed-off-by: Dongjoon Hyun --- .../cluster/k8s/ExecutorPodsPollingSnapshotSource.scala| 13 - .../cluster/k8s/ExecutorPodsWatchSnapshotSource.scala | 14 +- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala index da7fe7c..6fcb876 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala @@ -22,12 +22,21 @@ import io.fabric8.kubernetes.client.KubernetesClient import scala.collection.JavaConverters._ import org.apache.spark.SparkConf +import org.apache.spark.annotation.{DeveloperApi, Since, Stable} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging import org.apache.spark.util.{ThreadUtils, Utils} -private[spark] class ExecutorPodsPollingSnapshotSource( +/** + * :: DeveloperApi :: + * + * A class used for polling K8s executor pods by ExternalClusterManagers. + * @since 3.1.3 + */ +@Stable +@DeveloperApi +class ExecutorPodsPollingSnapshotSource( conf: SparkConf, kubernetesClient: KubernetesClient, snapshotsStore: ExecutorPodsSnapshotsStore, @@ -37,6 +46,7 @@ private[spark] class ExecutorPodsPollingSnapshotSource( private var pollingFuture: Future[_] = _ + @Since("3.1.3") def start(applicationId: String): Unit = { require(pollingFuture == null, "Cannot start polling more than once.") logDebug(s"Starting to check for executor pod state every $pollingInterval ms.") @@ -44,6 +54,7 @@ private[spark] class ExecutorPodsPollingSnapshotSource( new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) } + @Since("3.1.3") def stop(): Unit = { if (pollingFuture != null) { pollingFuture.cancel(true) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala index a6749a6..7ac70b5 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala @@ -22,16 +22,27 @@ import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action +import org.apache.spark.annotation.{DeveloperApi, Since, Stable} import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging import org.apache.spark.util.Utils -private[spark] class ExecutorPodsWatchSnapshotSource( +/** + * :: DeveloperApi :: + * + * A
[spark] branch branch-3.2 updated: [SPARK-37497][K8S] Promote `ExecutorPods[PollingSnapshot|WatchSnapshot]Source` to DeveloperApi
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new ceae41b [SPARK-37497][K8S] Promote `ExecutorPods[PollingSnapshot|WatchSnapshot]Source` to DeveloperApi ceae41b is described below commit ceae41ba5cafb479cdcfc9a6a162945646a68f05 Author: Dongjoon Hyun AuthorDate: Tue Nov 30 18:41:18 2021 -0800 [SPARK-37497][K8S] Promote `ExecutorPods[PollingSnapshot|WatchSnapshot]Source` to DeveloperApi ### What changes were proposed in this pull request? This PR aims to promote `ExecutorPodsWatchSnapshotSource` and `ExecutorPodsPollingSnapshotSource` as **stable** `DeveloperApi` in order to maintain it officially in a backward compatible way at Apache Spark 3.3.0. ### Why are the changes needed? - Since SPARK-24248 at Apache Spark 2.4.0, `ExecutorPodsWatchSnapshotSource` and `ExecutorPodsPollingSnapshotSource` have been used to monitor executor pods without any interface changes for over 3 years. - Apache Spark 3.1.1 makes `Kubernetes` module GA and provides an extensible external cluster manager framework. New `ExternalClusterManager` for K8s environment need to depend on this to monitor pods. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual review. Closes #34751 from dongjoon-hyun/SPARK-37497. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun (cherry picked from commit 2b044962cd6eff5a3a76f2808ee93b40bdf931df) Signed-off-by: Dongjoon Hyun --- .../cluster/k8s/ExecutorPodsPollingSnapshotSource.scala| 13 - .../cluster/k8s/ExecutorPodsWatchSnapshotSource.scala | 14 +- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala index da7fe7c..6fcb876 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala @@ -22,12 +22,21 @@ import io.fabric8.kubernetes.client.KubernetesClient import scala.collection.JavaConverters._ import org.apache.spark.SparkConf +import org.apache.spark.annotation.{DeveloperApi, Since, Stable} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging import org.apache.spark.util.{ThreadUtils, Utils} -private[spark] class ExecutorPodsPollingSnapshotSource( +/** + * :: DeveloperApi :: + * + * A class used for polling K8s executor pods by ExternalClusterManagers. + * @since 3.1.3 + */ +@Stable +@DeveloperApi +class ExecutorPodsPollingSnapshotSource( conf: SparkConf, kubernetesClient: KubernetesClient, snapshotsStore: ExecutorPodsSnapshotsStore, @@ -37,6 +46,7 @@ private[spark] class ExecutorPodsPollingSnapshotSource( private var pollingFuture: Future[_] = _ + @Since("3.1.3") def start(applicationId: String): Unit = { require(pollingFuture == null, "Cannot start polling more than once.") logDebug(s"Starting to check for executor pod state every $pollingInterval ms.") @@ -44,6 +54,7 @@ private[spark] class ExecutorPodsPollingSnapshotSource( new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS) } + @Since("3.1.3") def stop(): Unit = { if (pollingFuture != null) { pollingFuture.cancel(true) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala index 762878c..06d942e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala @@ -22,16 +22,27 @@ import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.{KubernetesClient, Watcher, WatcherException} import io.fabric8.kubernetes.client.Watcher.Action +import org.apache.spark.annotation.{DeveloperApi, Since, Stable} import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging import org.apache.spark.util.Utils -private[spark] class ExecutorPodsWatchSnapshotSource( +/** + * :: DeveloperApi :: + * + * A class us
[spark] branch master updated (ca25534 -> 2b04496)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from ca25534 [SPARK-37509][CORE] Improve Fallback Storage upload speed by avoiding S3 rate limiter add 2b04496 [SPARK-37497][K8S] Promote `ExecutorPods[PollingSnapshot|WatchSnapshot]Source` to DeveloperApi No new revisions were added by this update. Summary of changes: .../cluster/k8s/ExecutorPodsPollingSnapshotSource.scala| 13 - .../cluster/k8s/ExecutorPodsWatchSnapshotSource.scala | 14 +- 2 files changed, 25 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-37509][CORE] Improve Fallback Storage upload speed by avoiding S3 rate limiter
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ca25534 [SPARK-37509][CORE] Improve Fallback Storage upload speed by avoiding S3 rate limiter ca25534 is described below commit ca2553443977264e2e897006dc729ba61147829f Author: Dongjoon Hyun AuthorDate: Tue Nov 30 15:03:00 2021 -0800 [SPARK-37509][CORE] Improve Fallback Storage upload speed by avoiding S3 rate limiter ### What changes were proposed in this pull request? This PR aims to improve `Fallback Storage` upload speed by randomizing the path in order to avoid S3 rate limiter. ### Why are the changes needed? Currently, `Fallback Storage` is using `a single prefix per shuffle`. This PR aims to randomize the upload prefixes even in a single shuffle to avoid S3 rate limiter. - https://aws.amazon.com/premiumsupport/knowledge-center/s3-request-limit-avoid-throttling/ ### Does this PR introduce _any_ user-facing change? No. This is used internally during the runtime. ### How was this patch tested? Pass the CIs to verify read and write operations. To check the layout, check the uploaded path manually with the following configs. ``` spark.decommission.enabled true spark.storage.decommission.enabled true spark.storage.decommission.shuffleBlocks.enabledtrue spark.storage.decommission.fallbackStorage.path file:///tmp/fallback/ ``` Start one master and worker. Connect with `spark-shell` and generate shuffle data. ``` scala> sc.parallelize(1 to 11, 10).map(x => (x % 3, 1)).reduceByKey(_ + _).count() res0: Long = 3 ``` Invoke decommission and check. Since we have only one worker, the shuffle data go to the fallback storage directly. ``` $ kill -PWR $ tree /tmp/fallback /tmp/fallback └── app-20211130135922-0001 └── 0 ├── 103417883 │ └── shuffle_0_7_0.data ├── 1036881592 │ └── shuffle_0_4_0.data ├── 1094002679 │ └── shuffle_0_7_0.index ├── 1393510154 │ └── shuffle_0_6_0.index ├── 1515275369 │ └── shuffle_0_3_0.data ├── 1541340402 │ └── shuffle_0_2_0.index ├── 1639392452 │ └── shuffle_0_8_0.data ├── 1774061049 │ └── shuffle_0_9_0.index ├── 1846228218 │ └── shuffle_0_6_0.data ├── 1970345301 │ └── shuffle_0_1_0.data ├── 2073568524 │ └── shuffle_0_4_0.index ├── 227534966 │ └── shuffle_0_2_0.data ├── 266114061 │ └── shuffle_0_3_0.index ├── 413944309 │ └── shuffle_0_5_0.index ├── 581811660 │ └── shuffle_0_0_0.data ├── 705928743 │ └── shuffle_0_5_0.data ├── 713451784 │ └── shuffle_0_8_0.index ├── 861282032 │ └── shuffle_0_0_0.index ├── 912764509 │ └── shuffle_0_9_0.data └── 946172431 └── shuffle_0_1_0.index ``` Closes #34762 from dongjoon-hyun/SPARK-37509. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .../scala/org/apache/spark/storage/FallbackStorage.scala | 16 +++- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala index 7613713..d137099 100644 --- a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala +++ b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala @@ -31,6 +31,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.internal.config.{STORAGE_DECOMMISSION_FALLBACK_STORAGE_CLEANUP, STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH} import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} +import org.apache.spark.network.util.JavaUtils import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout} import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo} import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID @@ -60,15 +61,17 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging { val indexFile = r.getIndexFile(shuffleId, mapId) if (indexFile.exists()) { + val hash = JavaUtils.nonNegativeHash(indexFile.getName) fallbackFileSystem.copyFromLocalFile( new Path(indexFile.getAbsolutePath), -new
[spark] branch branch-3.1 updated: [SPARK-37505][MESOS][TESTS] Add a log4j.properties for `mesos` module UT
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.1 by this push: new b8b5f94 [SPARK-37505][MESOS][TESTS] Add a log4j.properties for `mesos` module UT b8b5f94 is described below commit b8b5f94a7e0b22d221c15c6f663b316bfd645d43 Author: yangjie01 AuthorDate: Tue Nov 30 14:40:54 2021 -0800 [SPARK-37505][MESOS][TESTS] Add a log4j.properties for `mesos` module UT ### What changes were proposed in this pull request? `scalatest-maven-plugin` configure `file:src/test/resources/log4j.properties` as the UT log configuration, so this PR adds this `log4j.properties` file to the mesos module for UT. ### Why are the changes needed? Supplement missing log4j configuration file for mesos module . ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass the Jenkins or GitHub Action - Manual test **Before** Run ``` mvn clean install -pl resource-managers/mesos -Pmesos -am -DskipTests mvn test -pl resource-managers/mesos -Pmesos ``` will print the following log: ``` log4j:ERROR Could not read configuration file from URL [file:src/test/resources/log4j.properties]. java.io.FileNotFoundException: src/test/resources/log4j.properties (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.(FileInputStream.java:138) at java.io.FileInputStream.(FileInputStream.java:93) at sun.net.www.protocol.file.FileURLConnection.connect(FileURLConnection.java:90) at sun.net.www.protocol.file.FileURLConnection.getInputStream(FileURLConnection.java:188) at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:557) at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526) at org.apache.log4j.LogManager.(LogManager.java:127) at org.slf4j.impl.Log4jLoggerFactory.(Log4jLoggerFactory.java:66) at org.slf4j.impl.StaticLoggerBinder.(StaticLoggerBinder.java:72) at org.slf4j.impl.StaticLoggerBinder.(StaticLoggerBinder.java:45) at org.apache.spark.internal.Logging$.org$apache$spark$internal$Logging$$isLog4j12(Logging.scala:222) at org.apache.spark.internal.Logging.initializeLogging(Logging.scala:127) at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:111) at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:105) at org.apache.spark.SparkFunSuite.initializeLogIfNecessary(SparkFunSuite.scala:62) at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:102) at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:101) at org.apache.spark.SparkFunSuite.initializeLogIfNecessary(SparkFunSuite.scala:62) at org.apache.spark.internal.Logging.log(Logging.scala:49) at org.apache.spark.internal.Logging.log$(Logging.scala:47) at org.apache.spark.SparkFunSuite.log(SparkFunSuite.scala:62) at org.apache.spark.SparkFunSuite.(SparkFunSuite.scala:74) at org.apache.spark.scheduler.cluster.mesos.MesosCoarseGrainedSchedulerBackendSuite.(MesosCoarseGrainedSchedulerBackendSuite.scala:43) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at java.lang.Class.newInstance(Class.java:442) at org.scalatest.tools.DiscoverySuite$.getSuiteInstance(DiscoverySuite.scala:66) at org.scalatest.tools.DiscoverySuite.$anonfun$nestedSuites$1(DiscoverySuite.scala:38) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.scalatest.tools.DiscoverySuite.(DiscoveryS
[spark] branch branch-3.2 updated: [SPARK-37505][MESOS][TESTS] Add a log4j.properties for `mesos` module UT
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 5f1ad08 [SPARK-37505][MESOS][TESTS] Add a log4j.properties for `mesos` module UT 5f1ad08 is described below commit 5f1ad08a020f9abc0f5cb8b8ef3660ccb0763dd7 Author: yangjie01 AuthorDate: Tue Nov 30 14:40:54 2021 -0800 [SPARK-37505][MESOS][TESTS] Add a log4j.properties for `mesos` module UT ### What changes were proposed in this pull request? `scalatest-maven-plugin` configure `file:src/test/resources/log4j.properties` as the UT log configuration, so this PR adds this `log4j.properties` file to the mesos module for UT. ### Why are the changes needed? Supplement missing log4j configuration file for mesos module . ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass the Jenkins or GitHub Action - Manual test **Before** Run ``` mvn clean install -pl resource-managers/mesos -Pmesos -am -DskipTests mvn test -pl resource-managers/mesos -Pmesos ``` will print the following log: ``` log4j:ERROR Could not read configuration file from URL [file:src/test/resources/log4j.properties]. java.io.FileNotFoundException: src/test/resources/log4j.properties (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.(FileInputStream.java:138) at java.io.FileInputStream.(FileInputStream.java:93) at sun.net.www.protocol.file.FileURLConnection.connect(FileURLConnection.java:90) at sun.net.www.protocol.file.FileURLConnection.getInputStream(FileURLConnection.java:188) at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:557) at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526) at org.apache.log4j.LogManager.(LogManager.java:127) at org.slf4j.impl.Log4jLoggerFactory.(Log4jLoggerFactory.java:66) at org.slf4j.impl.StaticLoggerBinder.(StaticLoggerBinder.java:72) at org.slf4j.impl.StaticLoggerBinder.(StaticLoggerBinder.java:45) at org.apache.spark.internal.Logging$.org$apache$spark$internal$Logging$$isLog4j12(Logging.scala:222) at org.apache.spark.internal.Logging.initializeLogging(Logging.scala:127) at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:111) at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:105) at org.apache.spark.SparkFunSuite.initializeLogIfNecessary(SparkFunSuite.scala:62) at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:102) at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:101) at org.apache.spark.SparkFunSuite.initializeLogIfNecessary(SparkFunSuite.scala:62) at org.apache.spark.internal.Logging.log(Logging.scala:49) at org.apache.spark.internal.Logging.log$(Logging.scala:47) at org.apache.spark.SparkFunSuite.log(SparkFunSuite.scala:62) at org.apache.spark.SparkFunSuite.(SparkFunSuite.scala:74) at org.apache.spark.scheduler.cluster.mesos.MesosCoarseGrainedSchedulerBackendSuite.(MesosCoarseGrainedSchedulerBackendSuite.scala:43) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at java.lang.Class.newInstance(Class.java:442) at org.scalatest.tools.DiscoverySuite$.getSuiteInstance(DiscoverySuite.scala:66) at org.scalatest.tools.DiscoverySuite.$anonfun$nestedSuites$1(DiscoverySuite.scala:38) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.scalatest.tools.DiscoverySuite.(DiscoveryS
[spark] branch master updated: [SPARK-37505][MESOS][TESTS] Add a log4j.properties for `mesos` module UT
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new fdb33dd [SPARK-37505][MESOS][TESTS] Add a log4j.properties for `mesos` module UT fdb33dd is described below commit fdb33dd9e27ac5d69ea875ca5bb85dfd369e71f1 Author: yangjie01 AuthorDate: Tue Nov 30 14:40:54 2021 -0800 [SPARK-37505][MESOS][TESTS] Add a log4j.properties for `mesos` module UT ### What changes were proposed in this pull request? `scalatest-maven-plugin` configure `file:src/test/resources/log4j.properties` as the UT log configuration, so this PR adds this `log4j.properties` file to the mesos module for UT. ### Why are the changes needed? Supplement missing log4j configuration file for mesos module . ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass the Jenkins or GitHub Action - Manual test **Before** Run ``` mvn clean install -pl resource-managers/mesos -Pmesos -am -DskipTests mvn test -pl resource-managers/mesos -Pmesos ``` will print the following log: ``` log4j:ERROR Could not read configuration file from URL [file:src/test/resources/log4j.properties]. java.io.FileNotFoundException: src/test/resources/log4j.properties (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.(FileInputStream.java:138) at java.io.FileInputStream.(FileInputStream.java:93) at sun.net.www.protocol.file.FileURLConnection.connect(FileURLConnection.java:90) at sun.net.www.protocol.file.FileURLConnection.getInputStream(FileURLConnection.java:188) at org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:557) at org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526) at org.apache.log4j.LogManager.(LogManager.java:127) at org.slf4j.impl.Log4jLoggerFactory.(Log4jLoggerFactory.java:66) at org.slf4j.impl.StaticLoggerBinder.(StaticLoggerBinder.java:72) at org.slf4j.impl.StaticLoggerBinder.(StaticLoggerBinder.java:45) at org.apache.spark.internal.Logging$.org$apache$spark$internal$Logging$$isLog4j12(Logging.scala:222) at org.apache.spark.internal.Logging.initializeLogging(Logging.scala:127) at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:111) at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:105) at org.apache.spark.SparkFunSuite.initializeLogIfNecessary(SparkFunSuite.scala:62) at org.apache.spark.internal.Logging.initializeLogIfNecessary(Logging.scala:102) at org.apache.spark.internal.Logging.initializeLogIfNecessary$(Logging.scala:101) at org.apache.spark.SparkFunSuite.initializeLogIfNecessary(SparkFunSuite.scala:62) at org.apache.spark.internal.Logging.log(Logging.scala:49) at org.apache.spark.internal.Logging.log$(Logging.scala:47) at org.apache.spark.SparkFunSuite.log(SparkFunSuite.scala:62) at org.apache.spark.SparkFunSuite.(SparkFunSuite.scala:74) at org.apache.spark.scheduler.cluster.mesos.MesosCoarseGrainedSchedulerBackendSuite.(MesosCoarseGrainedSchedulerBackendSuite.scala:43) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at java.lang.Class.newInstance(Class.java:442) at org.scalatest.tools.DiscoverySuite$.getSuiteInstance(DiscoverySuite.scala:66) at org.scalatest.tools.DiscoverySuite.$anonfun$nestedSuites$1(DiscoverySuite.scala:38) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.scalatest.tools.DiscoverySuite.(DiscoverySuite.sca
[spark] branch master updated: [MINOR][DOC] Update doc for `ResourceProfileManager.isSupported`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ac7c52d [MINOR][DOC] Update doc for `ResourceProfileManager.isSupported` ac7c52d is described below commit ac7c52db28f35237f78215c38b274a45c1ae7462 Author: Zhenhua Wang AuthorDate: Tue Nov 30 20:03:04 2021 +0900 [MINOR][DOC] Update doc for `ResourceProfileManager.isSupported` ### What changes were proposed in this pull request? Only doc update. ### Why are the changes needed? The doc doesn't match the actual logic. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Not related. Closes #34756 from wzhfy/improve_doc. Authored-by: Zhenhua Wang Signed-off-by: Hyukjin Kwon --- .../scala/org/apache/spark/resource/ResourceProfileManager.scala| 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala index f3e9856..2858443 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala @@ -57,8 +57,10 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf, private val notRunningUnitTests = !isTesting private val testExceptionThrown = sparkConf.get(RESOURCE_PROFILE_MANAGER_TESTING) - // If we use anything except the default profile, its only supported on YARN right now. - // Throw an exception if not supported. + /** + * If we use anything except the default profile, it's only supported on YARN and Kubernetes + * with dynamic allocation enabled. Throw an exception if not supported. + */ private[spark] def isSupported(rp: ResourceProfile): Boolean = { val isNotDefaultProfile = rp.id != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID val notYarnOrK8sAndNotDefaultProfile = isNotDefaultProfile && !(isYarn || isK8s) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org