[spark] branch master updated: [SPARK-44198][CORE] Support propagation of the log level to the executors
This is an automated email from the ASF dual-hosted git repository. attilapiros pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5fc90fbd4e3 [SPARK-44198][CORE] Support propagation of the log level to the executors 5fc90fbd4e3 is described below commit 5fc90fbd4e3235fbcf038f4725037321b8234d94 Author: Vinod KC AuthorDate: Thu Jul 27 16:39:33 2023 -0700 [SPARK-44198][CORE] Support propagation of the log level to the executors ### What changes were proposed in this pull request? Currently, the **sc.setLogLevel()** method only sets the log level on the Spark driver, failing to reflect the desired log level on the executors. With _--conf **spark.log.level**_ or **sc.setLogLevel()**, spark allows tuning the log level in the driver process, but it is not reflecting the log level on executors. ### Why are the changes needed? This inconsistency can lead to difficulties in debugging and monitoring Spark applications, as log messages from the executors may not align with the expected log level set on the user code. This PR aims to propagate the log level changes to executors when sc.setLogLevel() is called or send the current log level when a new executor is getting registered ### Does this PR introduce _any_ user-facing change? No, but with this PR, both driver and executor will show same log level ### How was this patch tested? Tested manually to verify the same log levels on both driver and executor Closes #41746 from vinodkc/br_support_setloglevel_executors. Authored-by: Vinod KC Signed-off-by: attilapiros --- .../main/scala/org/apache/spark/SparkContext.scala | 11 -- .../executor/CoarseGrainedExecutorBackend.scala| 4 .../org/apache/spark/internal/config/package.scala | 8 +++ .../apache/spark/scheduler/SchedulerBackend.scala | 1 + .../cluster/CoarseGrainedClusterMessage.scala | 7 +- .../cluster/CoarseGrainedSchedulerBackend.scala| 20 - .../main/scala/org/apache/spark/util/Utils.scala | 25 +++--- 7 files changed, 69 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 26fdb86d299..f48cb32b319 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -40,7 +40,6 @@ import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, Doub import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, TextInputFormat} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} -import org.apache.logging.log4j.Level import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.broadcast.Broadcast @@ -396,7 +395,10 @@ class SparkContext(config: SparkConf) extends Logging { require(SparkContext.VALID_LOG_LEVELS.contains(upperCased), s"Supplied level $logLevel did not match one of:" + s" ${SparkContext.VALID_LOG_LEVELS.mkString(",")}") -Utils.setLogLevel(Level.toLevel(upperCased)) +Utils.setLogLevelIfNeeded(upperCased) +if (conf.get(EXECUTOR_ALLOW_SYNC_LOG_LEVEL) && _schedulerBackend != null) { + _schedulerBackend.updateExecutorsLogLevel(upperCased) +} } try { @@ -585,6 +587,11 @@ class SparkContext(config: SparkConf) extends Logging { _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) +if (_conf.get(EXECUTOR_ALLOW_SYNC_LOG_LEVEL)) { + _conf.get(SPARK_LOG_LEVEL) +.foreach(logLevel => _schedulerBackend.updateExecutorsLogLevel(logLevel)) +} + val _executorMetricsSource = if (_conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) { Some(new ExecutorMetricsSource) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index ab238626efe..da009f5addb 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -177,6 +177,8 @@ private[spark] class CoarseGrainedExecutorBackend( case NonFatal(e) => exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) } +case UpdateExecutorLogLevel(newLogLevel) => + Utils.setLogLevelIfNeeded(newLogLevel) case LaunchTask(data) => if (executor == null) { @@ -473,6 +475,8 @
[spark] branch master updated: [SPARK-35084][CORE] Fixing --packages in k8s client mode with the driver running inside a POD
This is an automated email from the ASF dual-hosted git repository. attilapiros pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 9dc792d91c9 [SPARK-35084][CORE] Fixing --packages in k8s client mode with the driver running inside a POD 9dc792d91c9 is described below commit 9dc792d91c95f7ce290fff96e979e5540180f8a2 Author: Keunhyun Oh AuthorDate: Sat Jan 14 17:29:55 2023 -0800 [SPARK-35084][CORE] Fixing --packages in k8s client mode with the driver running inside a POD ### What changes were proposed in this pull request? Supporting '--packages' in the k8s cluster mode ### Why are the changes needed? In spark 3, '--packages' in the k8s cluster mode is not supported. I expected that managing dependencies by using packages like spark 2. Spark 2.4.5 https://github.com/apache/spark/blob/v2.4.5/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ```scala if (!isMesosCluster && !isStandAloneCluster) { // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files // too for packages that include Python code val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies( args.packagesExclusions, args.packages, args.repositories, args.ivyRepoPath, args.ivySettingsPath) if (!StringUtils.isBlank(resolvedMavenCoordinates)) { args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) if (args.isPython || isInternal(args.primaryResource)) { args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates) } } // install any R packages that may have been passed through --jars or --packages. // Spark Packages may contain R source code inside the jar. if (args.isR && !StringUtils.isBlank(args.jars)) { RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose) } } ``` Spark 3.0.2 https://github.com/apache/spark/blob/v3.0.2/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ```scala if (!StringUtils.isBlank(resolvedMavenCoordinates)) { // In K8s client mode, when in the driver, add resolved jars early as we might need // them at the submit time for artifact downloading. // For example we might use the dependencies for downloading // files from a Hadoop Compatible fs eg. S3. In this case the user might pass: // --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6 if (isKubernetesClusterModeDriver) { val loader = getSubmitClassLoader(sparkConf) for (jar <- resolvedMavenCoordinates.split(",")) { addJarToClasspath(jar, loader) } } else if (isKubernetesCluster) { // We need this in K8s cluster mode so that we can upload local deps // via the k8s application, like in cluster mode driver childClasspath ++= resolvedMavenCoordinates.split(",") } else { args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) if (args.isPython || isInternal(args.primaryResource)) { args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates) } } } ``` unlike spark2, in spark 3, jars are not added in any place. ### Does this PR introduce _any_ user-facing change? Unlike spark 2, resolved jars are added not in cluster mode spark submit but in driver. It's because in spark 3, the feature is added that is uploading jars with prefix "file://" to s3. So, if resolved jars are added in spark submit, every jars from packages are uploading to s3! When I tested it, it is very bad experience to me. ### How was this patch tested? 1) In my k8s environment, i tested the code. 2) unittest Closes #38828 from ocworld/SPARK-35084-CORE-k8s-pacakges. Authored-by: Keunhyun Oh Signed-off-by: attilapiros --- .../org/apache/spark/deploy/SparkSubmit.scala | 24 ++ .../org/apache/spark/deploy/SparkSubmitSuite.scala | 29 ++ 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index a701b0ea607..fa19c7918af 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -319,21 +319,23 @@ private[spark] class SparkSubmit extends Lo
[spark] branch branch-3.2 updated (4b1e06be85a -> b43e38b38ee)
This is an automated email from the ASF dual-hosted git repository. attilapiros pushed a change to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git from 4b1e06be85a [SPARK-40636][CORE] Fix wrong remained shuffles log in BlockManagerDecommissioner add b43e38b38ee [SPARK-40617][CORE][3.2] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries No new revisions were added by this update. Summary of changes: .../spark/executor/ExecutorMetricsPoller.scala | 21 - .../spark/executor/ExecutorMetricsPollerSuite.scala | 4 ++-- 2 files changed, 14 insertions(+), 11 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.3 updated: [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries
This is an automated email from the ASF dual-hosted git repository. attilapiros pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 90a27757ec1 [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries 90a27757ec1 is described below commit 90a27757ec17c2511049114a437f365326e51225 Author: attilapiros AuthorDate: Mon Oct 3 06:23:51 2022 -0700 [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries ### What changes were proposed in this pull request? Fix a race condition in ExecutorMetricsPoller between `getExecutorUpdates()` and `onTaskStart()` methods by avoiding removing entries when another stage is not started yet. ### Why are the changes needed? Spurious failures are reported because of the following assert: ``` 22/09/29 09:46:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 3063.0 in stage 1997.0 (TID 677249),5,main] java.lang.AssertionError: assertion failed: task count shouldn't below 0 at scala.Predef$.assert(Predef.scala:223) at org.apache.spark.executor.ExecutorMetricsPoller.decrementCount$1(ExecutorMetricsPoller.scala:130) at org.apache.spark.executor.ExecutorMetricsPoller.$anonfun$onTaskCompletion$3(ExecutorMetricsPoller.scala:135) at java.base/java.util.concurrent.ConcurrentHashMap.computeIfPresent(ConcurrentHashMap.java:1822) at org.apache.spark.executor.ExecutorMetricsPoller.onTaskCompletion(ExecutorMetricsPoller.scala:135) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:737) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) 22/09/29 09:46:24 INFO MemoryStore: MemoryStore cleared 22/09/29 09:46:24 INFO BlockManager: BlockManager stopped 22/09/29 09:46:24 INFO ShutdownHookManager: Shutdown hook called 22/09/29 09:46:24 INFO ShutdownHookManager: Deleting directory /mnt/yarn/usercache/hadoop/appcache/application_1664443624160_0001/spark-93efc2d4-84de-494b-a3b7-2cb1c3a45426 ``` I have checked the code and the basic assumption to have at least as many `onTaskStart()` calls as `onTaskCompletion()` for the same `stageId` & `stageAttemptId` pair is correct. But there is race condition between `getExecutorUpdates()` and `onTaskStart()`. First of all we have two different threads: - task runner: to execute the task and informs `ExecutorMetricsPoller` about task starts and completion - heartbeater: which uses the `ExecutorMetricsPoller` to get the metrics To show the race condition assume a task just finished which was running on its own (no other tasks was running). So this will decrease the `count` from 1 to 0. On the task runner thread let say a new task starts. So the execution is in the `onTaskStart()` method let's assume the `countAndPeaks` is already computed and here the counter is 0 but the execution is still before incrementing the counter. So we are in between the following two lines: ```scala val countAndPeaks = stageTCMP.computeIfAbsent((stageId, stageAttemptId), _ => TCMP(new AtomicLong(0), new AtomicLongArray(ExecutorMetricType.numMetrics))) val stageCount = countAndPeaks.count.incrementAndGet() ``` Let's look at the other thread (heartbeater) where the `getExecutorUpdates()` is running and it is at the `removeIfInactive()` method: ```scala def removeIfInactive(k: StageKey, v: TCMP): TCMP = { if (v.count.get == 0) { logDebug(s"removing (${k._1}, ${k._2}) from stageTCMP") null } else { v } } ``` And here this entry is removed from `stageTCMP` as the count is 0. Let's go back to the task runner thread where we increase the counter to 1 but that value will be lost as we have no entry in the `stageTCMP` for this stage and attempt. So if a new task comes instead of 2 we will have 1 in the `stageTCMP` and when those two tasks finishes the second one will decrease the counter from 0 to -1. This is when the assert raised. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test. I managed to reproduce the issue with a temporary test: ```scala test("reproduce assert failure") { val testMemoryManager = new TestMemoryManag
[spark] branch master updated (08708225417 -> 564a51b64e7)
This is an automated email from the ASF dual-hosted git repository. attilapiros pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 08708225417 [SPARK-40509][SS][PYTHON][FOLLLOW-UP] Add example for applyInPandasWithState followup add 564a51b64e7 [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries No new revisions were added by this update. Summary of changes: .../spark/executor/ExecutorMetricsPoller.scala | 21 - .../spark/executor/ExecutorMetricsPollerSuite.scala | 4 ++-- 2 files changed, 14 insertions(+), 11 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (ef837ca -> fcc5176)
This is an automated email from the ASF dual-hosted git repository. attilapiros pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from ef837ca [SPARK-37905][INFRA] Make `merge_spark_pr.py` set primary author from the first commit in case of ties add fcc5176 [SPARK-36967][CORE] Report accurate shuffle block size if its skewed No new revisions were added by this update. Summary of changes: .../org/apache/spark/internal/config/package.scala | 24 ++ .../org/apache/spark/scheduler/MapStatus.scala | 32 ++- .../main/scala/org/apache/spark/util/Utils.scala | 16 .../apache/spark/scheduler/MapStatusSuite.scala| 97 ++ .../execution/adaptive/OptimizeSkewedJoin.scala| 15 +--- 5 files changed, 169 insertions(+), 15 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-36406][CORE] Avoid unnecessary file operations before delete a write failed file held by DiskBlockObjectWriter
This is an automated email from the ASF dual-hosted git repository. attilapiros pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6f0b1c9 [SPARK-36406][CORE] Avoid unnecessary file operations before delete a write failed file held by DiskBlockObjectWriter 6f0b1c9 is described below commit 6f0b1c9c9c7aec600358ee15f6d6ac8cf67864e9 Author: yangjie01 AuthorDate: Mon Dec 20 09:52:38 2021 +0100 [SPARK-36406][CORE] Avoid unnecessary file operations before delete a write failed file held by DiskBlockObjectWriter ### What changes were proposed in this pull request? We always do file truncate operation before delete a write failed file held by `DiskBlockObjectWriter`, a typical process is as follows: ``` if (!success) { // This code path only happens if an exception was thrown above before we set success; // close our stuff and let the exception be thrown further writer.revertPartialWritesAndClose() if (file.exists()) { if (!file.delete()) { logWarning(s"Error deleting ${file}") } } } ``` The `revertPartialWritesAndClose` method will reverts writes that haven't been committed yet, but it doesn't seem necessary in the current scene. So this pr add a new method to `DiskBlockObjectWriter` named `closeAndDelete()`, the new method just revert write metrics and delete the write failed file. ### Why are the changes needed? Avoid unnecessary file operations. ### Does this PR introduce _any_ user-facing change? Add a new method to `DiskBlockObjectWriter` named `closeAndDelete(). ### How was this patch tested? Pass the Jenkins or GitHub Action Closes #33628 from LuciferYang/SPARK-36406. Authored-by: yangjie01 Signed-off-by: attilapiros --- .../shuffle/sort/BypassMergeSortShuffleWriter.java | 5 + .../spark/storage/DiskBlockObjectWriter.scala | 26 ++ .../util/collection/ExternalAppendOnlyMap.scala| 7 +- .../spark/util/collection/ExternalSorter.scala | 7 +- .../spark/storage/DiskBlockObjectWriterSuite.scala | 19 5 files changed, 48 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 9a5ac6f..da7a518 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -289,10 +289,7 @@ final class BypassMergeSortShuffleWriter try { for (DiskBlockObjectWriter writer : partitionWriters) { // This method explicitly does _not_ throw exceptions: - File file = writer.revertPartialWritesAndClose(); - if (!file.delete()) { -logger.error("Error while deleting file {}", file.getAbsolutePath()); - } + writer.closeAndDelete(); } } finally { partitionWriters = null; diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index 4170609..3bdae2f 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -19,6 +19,7 @@ package org.apache.spark.storage import java.io.{BufferedOutputStream, File, FileOutputStream, IOException, OutputStream} import java.nio.channels.{ClosedByInterruptException, FileChannel} +import java.nio.file.Files import java.util.zip.Checksum import org.apache.spark.errors.SparkCoreErrors @@ -119,6 +120,11 @@ private[spark] class DiskBlockObjectWriter( private var numRecordsWritten = 0 /** + * Keep track the number of written records committed. + */ + private var numRecordsCommitted = 0L + + /** * Set the checksum that the checksumOutputStream should use */ def setChecksum(checksum: Checksum): Unit = { @@ -223,6 +229,7 @@ private[spark] class DiskBlockObjectWriter( // In certain compression codecs, more bytes are written after streams are closed writeMetrics.incBytesWritten(committedPosition - reportedPosition) reportedPosition = committedPosition + numRecordsCommitted += numRecordsWritten numRecordsWritten = 0 fileSegment } else { @@ -273,6 +280,25 @@ private[spark] class DiskBlockObjectWriter( } /** + * Reverts write metrics and delete the file held by current `DiskBlockObjectWriter`. + * Callers should invoke this function when there are runtime exceptions in file + * writing process an
[spark] branch master updated: [SPARK-35672][CORE][YARN] Pass user classpath entries to executors using config instead of command line
This is an automated email from the ASF dual-hosted git repository. attilapiros pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 693537f [SPARK-35672][CORE][YARN] Pass user classpath entries to executors using config instead of command line 693537f is described below commit 693537f7852b8a3b9f3cf3dc21eb1c132f64db0d Author: Erik Krogen AuthorDate: Tue Nov 16 18:08:48 2021 +0100 [SPARK-35672][CORE][YARN] Pass user classpath entries to executors using config instead of command line ### What changes were proposed in this pull request? Refactor the logic for constructing the user classpath from `yarn.ApplicationMaster` into `yarn.Client` so that it can be leveraged on the executor side as well, instead of having the driver construct it and pass it to the executor via command-line arguments. A new method, `getUserClassPath`, is added to `CoarseGrainedExecutorBackend` which defaults to `Nil` (consistent with the existing behavior where non-YARN resource managers do not configure the user classpath). `YarnCoarseGrained [...] Please note that this is a re-submission of #32810, which was reverted in #34082 due to the issues described in [this comment](https://issues.apache.org/jira/browse/SPARK-35672?focusedCommentId=17419285&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17419285). This PR additionally includes the changes described in #34084 to resolve the issue, though this PR has been enhanced to properly handle escape strings, unlike #34084. ### Why are the changes needed? User-provided JARs are made available to executors using a custom classloader, so they do not appear on the standard Java classpath. Instead, they are passed as a list to the executor which then creates a classloader out of the URLs. Currently in the case of YARN, this list of JARs is crafted by the Driver (in `ExecutorRunnable`), which then passes the information to the executors (`CoarseGrainedExecutorBackend`) by specifying each JAR on the executor command line as `--user-class-pat [...] > /bin/bash: Argument list too long A [Google search](https://www.google.com/search?q=spark%20%22%2Fbin%2Fbash%3A%20argument%20list%20too%20long%22&oq=spark%20%22%2Fbin%2Fbash%3A%20argument%20list%20too%20long%22) indicates that this is not a theoretical problem and afflicts real users, including ours. Passing this list using the configurations instead resolves this issue. ### Does this PR introduce _any_ user-facing change? There is one small behavioral change which is a bug fix. Previously the `spark.yarn.config.gatewayPath` and `spark.yarn.config.replacementPath` options were only applied to executors, meaning they would not work for the driver when running in cluster mode. This appears to be a bug; the [documentation for this functionality](https://spark.apache.org/docs/latest/running-on-yarn.html) does not mention any limitations that this is only for executors. This PR fixes that issue. Additionally, this fixes the main bash argument length issue, allowing for larger JAR lists to be passed successfully. Configuration of JARs is identical to before, and substitution of environment variables in `spark.jars` or `spark.yarn.config.replacementPath` works as expected. ### How was this patch tested? New unit tests were added in `YarnClusterSuite`. Also, we have been running a similar fix internally for 4 months with great success. Closes #34120 from xkrogen/xkrogen-SPARK-35672-yarn-classpath-list-take2. Authored-by: Erik Krogen Signed-off-by: attilapiros --- .../executor/CoarseGrainedExecutorBackend.scala| 17 ++--- .../scala/org/apache/spark/executor/Executor.scala | 2 + .../CoarseGrainedExecutorBackendSuite.scala| 17 ++--- .../cluster/k8s/KubernetesExecutorBackend.scala| 2 +- .../spark/deploy/yarn/ApplicationMaster.scala | 9 +-- .../org/apache/spark/deploy/yarn/Client.scala | 34 - .../spark/deploy/yarn/ExecutorRunnable.scala | 12 --- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala| 75 ++- .../YarnCoarseGrainedExecutorBackend.scala | 8 +- .../org/apache/spark/deploy/yarn/ClientSuite.scala | 35 + .../spark/deploy/yarn/YarnClusterSuite.scala | 86 +++--- .../deploy/yarn/YarnSparkHadoopUtilSuite.scala | 40 ++ 12 files changed, 280 insertions(+), 57 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 4f63ada..43887a7 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spa
[spark] branch master updated (0bba90b -> c29bb02)
This is an automated email from the ASF dual-hosted git repository. attilapiros pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 0bba90b [SPARK-36978][SQL] InferConstraints rule should create IsNotNull constraints on the accessed nested field instead of the root nested type add c29bb02 [SPARK-36965][PYTHON] Extend python test runner by logging out the temp output files No new revisions were added by this update. Summary of changes: python/run-tests.py | 12 ++-- 1 file changed, 10 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (8e92ef8 -> 03e48c8)
This is an automated email from the ASF dual-hosted git repository. attilapiros pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 8e92ef8 [SPARK-35749][SPARK-35773][SQL] Parse unit list interval literals as tightest year-month/day-time interval types add 03e48c8 [SPARK-35334][K8S] Make Spark more resilient to intermittent K8s flakiness No new revisions were added by this update. Summary of changes: .../apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala | 10 ++ 1 file changed, 10 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (a70e66e -> 4534c0c)
This is an automated email from the ASF dual-hosted git repository. attilapiros pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from a70e66e [SPARK-35665][SQL] Resolve UnresolvedAlias in CollectMetrics add 4534c0c [SPARK-35543][CORE] Fix memory leak in BlockManagerMasterEndpoint removeRdd No new revisions were added by this update. Summary of changes: .../apache/spark/storage/BlockManagerMasterEndpoint.scala| 12 +--- 1 file changed, 9 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (d2a535f -> 8b94eff)
This is an automated email from the ASF dual-hosted git repository. attilapiros pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from d2a535f [SPARK-34246][FOLLOWUP] Change the definition of `findTightestCommonType` for backward compatibility add 8b94eff [SPARK-34736][K8S][TESTS] Kubernetes and Minikube version upgrade for integration tests No new revisions were added by this update. Summary of changes: .../kubernetes/integration-tests/README.md | 4 +- .../dev/dev-run-integration-tests.sh | 6 +- .../k8s/integrationtest/KubernetesSuite.scala | 1 + .../deploy/k8s/integrationtest/PVTestsSuite.scala | 2 +- .../backend/minikube/Minikube.scala| 113 - 5 files changed, 52 insertions(+), 74 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (4e3daa5 -> 738cf7f)
This is an automated email from the ASF dual-hosted git repository. attilapiros pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 4e3daa5 [SPARK-35254][BUILD] Upgrade SBT to 1.5.1 add 738cf7f [SPARK-35009][CORE] Avoid creating multiple python worker monitor threads for the same worker and same task context No new revisions were added by this update. Summary of changes: .../org/apache/spark/api/python/PythonRunner.scala | 28 -- 1 file changed, 26 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (132cbf0 -> 068b6c8)
This is an automated email from the ASF dual-hosted git repository. attilapiros pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 132cbf0 [SPARK-35105][SQL] Support multiple paths for ADD FILE/JAR/ARCHIVE commands add 068b6c8 [SPARK-35234][CORE] Reserve the format of stage failureMessage No new revisions were added by this update. Summary of changes: .../scala/org/apache/spark/scheduler/DAGScheduler.scala| 14 +- 1 file changed, 5 insertions(+), 9 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (71133e1 -> 2cb962b)
This is an automated email from the ASF dual-hosted git repository. attilapiros pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 71133e1 [SPARK-35070][SQL] TRANSFORM not support alias in inputs add 2cb962b [MINOR][CORE] Correct the number of started fetch requests in log No new revisions were added by this update. Summary of changes: .../org/apache/spark/storage/ShuffleBlockFetcherIterator.scala | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-34779][CORE] ExecutorMetricsPoller should keep stage entry in stageTCMP until a heartbeat occurs
This is an automated email from the ASF dual-hosted git repository. attilapiros pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f03c7c0 [SPARK-34779][CORE] ExecutorMetricsPoller should keep stage entry in stageTCMP until a heartbeat occurs f03c7c0 is described below commit f03c7c0e9dd6b6b87049b8546460b7f21c086749 Author: Baohe Zhang AuthorDate: Fri Apr 2 07:14:18 2021 +0200 [SPARK-34779][CORE] ExecutorMetricsPoller should keep stage entry in stageTCMP until a heartbeat occurs ### What changes were proposed in this pull request? Allow ExecutorMetricsPoller to keep stage entries in stageTCMP until a heartbeat occurs even if the entries have task count = 0. ### Why are the changes needed? This is an improvement. The current implementation of ExecutorMetricsPoller keeps a map, stageTCMP of (stageId, stageAttemptId) to (count of running tasks, executor metric peaks). The entry for the stage is removed on task completion if the task count decreases to 0. In the case of an executor with a single core, this leads to unnecessary removal and insertion of entries for a given stage. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? A new unit test is added. Closes #31871 from baohe-zhang/SPARK-34779. Authored-by: Baohe Zhang Signed-off-by: “attilapiros” --- .../spark/executor/ExecutorMetricsPoller.scala | 29 - .../executor/ExecutorMetricsPollerSuite.scala | 48 ++ 2 files changed, 67 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala index 1c1a1ca..0cdb306 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala @@ -53,10 +53,10 @@ private[spark] class ExecutorMetricsPoller( type StageKey = (Int, Int) // Task Count and Metric Peaks - private case class TCMP(count: AtomicLong, peaks: AtomicLongArray) + private[executor] case class TCMP(count: AtomicLong, peaks: AtomicLongArray) // Map of (stageId, stageAttemptId) to (count of running tasks, executor metric peaks) - private val stageTCMP = new ConcurrentHashMap[StageKey, TCMP] + private[executor] val stageTCMP = new ConcurrentHashMap[StageKey, TCMP] // Map of taskId to executor metric peaks private val taskMetricPeaks = new ConcurrentHashMap[Long, AtomicLongArray] @@ -124,17 +124,12 @@ private[spark] class ExecutorMetricsPoller( */ def onTaskCompletion(taskId: Long, stageId: Int, stageAttemptId: Int): Unit = { // Decrement the task count. -// Remove the entry from stageTCMP if the task count reaches zero. def decrementCount(stage: StageKey, countAndPeaks: TCMP): TCMP = { val countValue = countAndPeaks.count.decrementAndGet() - if (countValue == 0L) { -logDebug(s"removing (${stage._1}, ${stage._2}) from stageTCMP") -null - } else { -logDebug(s"stageTCMP: (${stage._1}, ${stage._2}) -> " + countValue) -countAndPeaks - } + assert(countValue >= 0, "task count shouldn't below 0") + logDebug(s"stageTCMP: (${stage._1}, ${stage._2}) -> " + countValue) + countAndPeaks } stageTCMP.computeIfPresent((stageId, stageAttemptId), decrementCount) @@ -176,6 +171,20 @@ private[spark] class ExecutorMetricsPoller( stageTCMP.replaceAll(getUpdateAndResetPeaks) +def removeIfInactive(k: StageKey, v: TCMP): TCMP = { + if (v.count.get == 0) { +logDebug(s"removing (${k._1}, ${k._2}) from stageTCMP") +null + } else { +v + } +} + +// Remove the entry from stageTCMP if the task count reaches zero. +executorUpdates.foreach { case (k, _) => + stageTCMP.computeIfPresent(k, removeIfInactive) +} + executorUpdates } diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorMetricsPollerSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorMetricsPollerSuite.scala new file mode 100644 index 000..e471864 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorMetricsPollerSuite.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at
[spark-website] branch asf-site updated: Promoting SSH remotes and document their purpose, mentioning GitBox
This is an automated email from the ASF dual-hosted git repository. attilapiros pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/spark-website.git The following commit(s) were added to refs/heads/asf-site by this push: new c7e0d8e Promoting SSH remotes and document their purpose, mentioning GitBox c7e0d8e is described below commit c7e0d8e60838cefabf5b06f45e32abb64612db01 Author: attilapiros AuthorDate: Fri Mar 19 19:16:34 2021 +0100 Promoting SSH remotes and document their purpose, mentioning GitBox Author: attilapiros Closes #329 from attilapiros/update_committer_page. --- committers.md| 25 ++--- site/committers.html | 29 - 2 files changed, 30 insertions(+), 24 deletions(-) diff --git a/committers.md b/committers.md index 88c8fb5..529fdfe 100644 --- a/committers.md +++ b/committers.md @@ -170,20 +170,21 @@ To use the `merge_spark_pr.py` script described below, you will need to add a git remote called `apache` at `https://github.com/apache/spark`, as well as one called `apache-github` at `git://github.com/apache/spark`. -You will likely also have a remote `origin` pointing to your fork of Spark, and -`upstream` pointing to the `apache/spark` GitHub repo. +The `apache` (the default value of `PUSH_REMOTE_NAME` environment variable) is the remote used for pushing the squashed commits +and `apache-github` (default value of `PR_REMOTE_NAME`) is the remote used for pulling the changes. +By using two separate remotes for these two actions the result of the `merge_spark_pr.py` can be tested without pushing it +into the official Spark repo just by specifying your fork in the `PUSH_REMOTE_NAME` variable. -If correct, your `git remote -v` should look like: +After cloning your fork of Spark you already have a remote `origin` pointing there. So if correct, your `git remote -v` +contains at least these lines: ``` -apache https://github.com/apache/spark.git (fetch) -apache https://github.com/apache/spark.git (push) -apache-github git://github.com/apache/spark (fetch) -apache-github git://github.com/apache/spark (push) -origin https://github.com/[your username]/spark.git (fetch) -origin https://github.com/[your username]/spark.git (push) -upstream https://github.com/apache/spark.git (fetch) -upstream https://github.com/apache/spark.git (push) +apache g...@github.com:apache/spark-website.git (fetch) +apache g...@github.com:apache/spark-website.git (push) +apache-github g...@github.com:apache/spark-website.git (fetch) +apache-github g...@github.com:apache/spark-website.git (push) +origin g...@github.com:[your username]/spark-website.git (fetch) +origin g...@github.com:[your username]/spark-website.git (push) ``` For the `apache` repo, you will need to set up command-line authentication to GitHub. This may @@ -192,6 +193,8 @@ include setting up an SSH key and/or personal access token. See: - https://help.github.com/articles/connecting-to-github-with-ssh/ - https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/ +To check whether the necessary write access are already granted please visit [GitBox](https://gitbox.apache.org/setup/). + Ask `d...@spark.apache.org` if you have trouble with these steps, or want help doing your first merge. Merge Script diff --git a/site/committers.html b/site/committers.html index 9cc9d97..7760439 100644 --- a/site/committers.html +++ b/site/committers.html @@ -618,19 +618,20 @@ it. So please don’t add any test commits or anything like that, only real will need to add a git remote called apache at https://github.com/apache/spark, as well as one called apache-github at git://github.com/apache/spark. -You will likely also have a remote origin pointing to your fork of Spark, and -upstream pointing to the apache/spark GitHub repo. - -If correct, your git remote -v should look like: - -apache https://github.com/apache/spark.git (fetch) -apache https://github.com/apache/spark.git (push) -apache-github git://github.com/apache/spark (fetch) -apache-github git://github.com/apache/spark (push) -origin https://github.com/[your username]/spark.git (fetch) -origin https://github.com/[your username]/spark.git (push) -upstream https://github.com/apache/spark.git (fetch) -upstream https://github.com/apache/spark.git (push) +The apache (the default value of PUSH_REMOTE_NAME environment variable) is the remote used for pushing the squashed commits +and apache-github (default value of PR_REMOTE_NAME) is the remote used for pulling the changes. +By using two separate remotes for these two actions the result of the merge_spark_pr.py can be tested without pushing it +into the official Spark repo just by specifying your fork in the PUSH_REMOTE_NAME variable. + +After cloning your fork of Spark you already have a remote origin pointing there. So if correct, your git remote -v
[spark-website] branch asf-site updated: Add Attila Zsolt Piros to committers
This is an automated email from the ASF dual-hosted git repository. attilapiros pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/spark-website.git The following commit(s) were added to refs/heads/asf-site by this push: new fc65ac1 Add Attila Zsolt Piros to committers fc65ac1 is described below commit fc65ac1f9dbc3cb25fc58703f74aefb51a157c32 Author: attilapiros AuthorDate: Fri Mar 19 12:25:25 2021 +0100 Add Attila Zsolt Piros to committers Author: attilapiros Closes #328 from attilapiros/add_committer. --- committers.md| 1 + site/committers.html | 4 2 files changed, 5 insertions(+) diff --git a/committers.md b/committers.md index ae84112..88c8fb5 100644 --- a/committers.md +++ b/committers.md @@ -62,6 +62,7 @@ navigation: |Sean Owen|Databricks| |Tejas Patil|Facebook| |Nick Pentreath|IBM| +|Attila Zsolt Piros|Cloudera| |Anirudh Ramanathan|Rockset| |Imran Rashid|Cloudera| |Charles Reiss|University of Virginia| diff --git a/site/committers.html b/site/committers.html index 5985718..9cc9d97 100644 --- a/site/committers.html +++ b/site/committers.html @@ -419,6 +419,10 @@ IBM + Attila Zsolt Piros + Cloudera + + Anirudh Ramanathan Rockset - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org