[spark] branch master updated: [SPARK-44198][CORE] Support propagation of the log level to the executors

2023-07-27 Thread attilapiros
This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5fc90fbd4e3 [SPARK-44198][CORE] Support propagation of the log level 
to the executors
5fc90fbd4e3 is described below

commit 5fc90fbd4e3235fbcf038f4725037321b8234d94
Author: Vinod KC 
AuthorDate: Thu Jul 27 16:39:33 2023 -0700

[SPARK-44198][CORE] Support propagation of the log level to the executors

### What changes were proposed in this pull request?

Currently, the **sc.setLogLevel()** method only sets the log level on the 
Spark driver, failing to reflect the desired log level on the executors. With  
_--conf **spark.log.level**_  or **sc.setLogLevel()**, spark allows tuning the 
log level in the driver process, but it is not reflecting the log level on 
executors.

### Why are the changes needed?
This inconsistency can lead to difficulties in debugging and monitoring 
Spark applications, as log messages from the executors may not align with the 
expected log level set on the user code.
This PR aims to propagate the log level changes to executors when  
sc.setLogLevel()  is called or send the current log level when a new executor 
is getting registered

### Does this PR introduce _any_ user-facing change?
No, but with this PR, both driver and executor will show same log level

### How was this patch tested?

Tested manually to verify the same log levels on both driver and executor

Closes #41746 from vinodkc/br_support_setloglevel_executors.

Authored-by: Vinod KC 
Signed-off-by: attilapiros 
---
 .../main/scala/org/apache/spark/SparkContext.scala | 11 --
 .../executor/CoarseGrainedExecutorBackend.scala|  4 
 .../org/apache/spark/internal/config/package.scala |  8 +++
 .../apache/spark/scheduler/SchedulerBackend.scala  |  1 +
 .../cluster/CoarseGrainedClusterMessage.scala  |  7 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala| 20 -
 .../main/scala/org/apache/spark/util/Utils.scala   | 25 +++---
 7 files changed, 69 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 26fdb86d299..f48cb32b319 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -40,7 +40,6 @@ import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, 
BytesWritable, Doub
 import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, 
SequenceFileInputFormat, TextInputFormat}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => 
NewHadoopJob}
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => 
NewFileInputFormat}
-import org.apache.logging.log4j.Level
 
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.broadcast.Broadcast
@@ -396,7 +395,10 @@ class SparkContext(config: SparkConf) extends Logging {
 require(SparkContext.VALID_LOG_LEVELS.contains(upperCased),
   s"Supplied level $logLevel did not match one of:" +
 s" ${SparkContext.VALID_LOG_LEVELS.mkString(",")}")
-Utils.setLogLevel(Level.toLevel(upperCased))
+Utils.setLogLevelIfNeeded(upperCased)
+if (conf.get(EXECUTOR_ALLOW_SYNC_LOG_LEVEL) && _schedulerBackend != null) {
+  _schedulerBackend.updateExecutorsLogLevel(upperCased)
+}
   }
 
   try {
@@ -585,6 +587,11 @@ class SparkContext(config: SparkConf) extends Logging {
 _dagScheduler = new DAGScheduler(this)
 _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
 
+if (_conf.get(EXECUTOR_ALLOW_SYNC_LOG_LEVEL)) {
+  _conf.get(SPARK_LOG_LEVEL)
+.foreach(logLevel => 
_schedulerBackend.updateExecutorsLogLevel(logLevel))
+}
+
 val _executorMetricsSource =
   if (_conf.get(METRICS_EXECUTORMETRICS_SOURCE_ENABLED)) {
 Some(new ExecutorMetricsSource)
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index ab238626efe..da009f5addb 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -177,6 +177,8 @@ private[spark] class CoarseGrainedExecutorBackend(
 case NonFatal(e) =>
   exitExecutor(1, "Unable to create executor due to " + e.getMessage, 
e)
   }
+case UpdateExecutorLogLevel(newLogLevel) =>
+  Utils.setLogLevelIfNeeded(newLogLevel)
 
 case LaunchTask(data) =>
   if (executor == null) {
@@ -473,6 +475,8 @

[spark] branch master updated: [SPARK-35084][CORE] Fixing --packages in k8s client mode with the driver running inside a POD

2023-01-14 Thread attilapiros
This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 9dc792d91c9 [SPARK-35084][CORE] Fixing --packages in k8s client mode 
with the driver running inside a POD
9dc792d91c9 is described below

commit 9dc792d91c95f7ce290fff96e979e5540180f8a2
Author: Keunhyun Oh 
AuthorDate: Sat Jan 14 17:29:55 2023 -0800

[SPARK-35084][CORE] Fixing --packages in k8s client mode with the driver 
running inside a POD

### What changes were proposed in this pull request?
Supporting '--packages' in the k8s cluster mode

### Why are the changes needed?
In spark 3, '--packages' in the k8s cluster mode is not supported. I 
expected that managing dependencies by using packages like spark 2.

Spark 2.4.5


https://github.com/apache/spark/blob/v2.4.5/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

```scala
 if (!isMesosCluster && !isStandAloneCluster) {
  // Resolve maven dependencies if there are any and add classpath to 
jars. Add them to py-files
  // too for packages that include Python code
  val resolvedMavenCoordinates = 
DependencyUtils.resolveMavenDependencies(
args.packagesExclusions, args.packages, args.repositories, 
args.ivyRepoPath,
args.ivySettingsPath)

  if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
if (args.isPython || isInternal(args.primaryResource)) {
  args.pyFiles = mergeFileLists(args.pyFiles, 
resolvedMavenCoordinates)
}
  }

  // install any R packages that may have been passed through --jars or 
--packages.
  // Spark Packages may contain R source code inside the jar.
  if (args.isR && !StringUtils.isBlank(args.jars)) {
RPackageUtils.checkAndBuildRPackage(args.jars, printStream, 
args.verbose)
  }
}
 ```

Spark 3.0.2


https://github.com/apache/spark/blob/v3.0.2/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

```scala
   if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
// In K8s client mode, when in the driver, add resolved jars early 
as we might need
// them at the submit time for artifact downloading.
// For example we might use the dependencies for downloading
// files from a Hadoop Compatible fs eg. S3. In this case the user 
might pass:
// --packages 
com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6
if (isKubernetesClusterModeDriver) {
  val loader = getSubmitClassLoader(sparkConf)
  for (jar <- resolvedMavenCoordinates.split(",")) {
addJarToClasspath(jar, loader)
  }
} else if (isKubernetesCluster) {
  // We need this in K8s cluster mode so that we can upload local 
deps
  // via the k8s application, like in cluster mode driver
  childClasspath ++= resolvedMavenCoordinates.split(",")
} else {
  args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
  if (args.isPython || isInternal(args.primaryResource)) {
args.pyFiles = mergeFileLists(args.pyFiles, 
resolvedMavenCoordinates)
  }
}
  }
```

unlike spark2, in spark 3, jars are not added in any place.

### Does this PR introduce _any_ user-facing change?
Unlike spark 2, resolved jars are added not in cluster mode spark submit 
but in driver.

It's because in spark 3, the feature is added that is uploading jars with 
prefix "file://" to s3.
So, if resolved jars are added in spark submit, every jars from packages 
are uploading to s3! When I tested it, it is very bad experience to me.

### How was this patch tested?
1) In my k8s environment, i tested the code.
2) unittest

Closes #38828 from ocworld/SPARK-35084-CORE-k8s-pacakges.

Authored-by: Keunhyun Oh 
Signed-off-by: attilapiros 
---
 .../org/apache/spark/deploy/SparkSubmit.scala  | 24 ++
 .../org/apache/spark/deploy/SparkSubmitSuite.scala | 29 ++
 2 files changed, 42 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index a701b0ea607..fa19c7918af 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -319,21 +319,23 @@ private[spark] class SparkSubmit extends Lo

[spark] branch branch-3.2 updated (4b1e06be85a -> b43e38b38ee)

2022-10-04 Thread attilapiros
This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a change to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


from 4b1e06be85a [SPARK-40636][CORE] Fix wrong remained shuffles log in 
BlockManagerDecommissioner
 add b43e38b38ee [SPARK-40617][CORE][3.2] Fix race condition at the 
handling of ExecutorMetricsPoller's stageTCMP entries

No new revisions were added by this update.

Summary of changes:
 .../spark/executor/ExecutorMetricsPoller.scala  | 21 -
 .../spark/executor/ExecutorMetricsPollerSuite.scala |  4 ++--
 2 files changed, 14 insertions(+), 11 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries

2022-10-03 Thread attilapiros
This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 90a27757ec1 [SPARK-40617] Fix race condition at the handling of 
ExecutorMetricsPoller's stageTCMP entries
90a27757ec1 is described below

commit 90a27757ec17c2511049114a437f365326e51225
Author: attilapiros 
AuthorDate: Mon Oct 3 06:23:51 2022 -0700

[SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's 
stageTCMP entries

### What changes were proposed in this pull request?

Fix a race condition in ExecutorMetricsPoller between  
`getExecutorUpdates()` and `onTaskStart()` methods
by avoiding removing entries when another stage is not started yet.

### Why are the changes needed?

Spurious failures are reported because of the following assert:

```
22/09/29 09:46:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception 
in thread Thread[Executor task launch worker for task 3063.0 in stage 1997.0 
(TID 677249),5,main]
java.lang.AssertionError: assertion failed: task count shouldn't below 0
at scala.Predef$.assert(Predef.scala:223)
at 
org.apache.spark.executor.ExecutorMetricsPoller.decrementCount$1(ExecutorMetricsPoller.scala:130)
at 
org.apache.spark.executor.ExecutorMetricsPoller.$anonfun$onTaskCompletion$3(ExecutorMetricsPoller.scala:135)
at 
java.base/java.util.concurrent.ConcurrentHashMap.computeIfPresent(ConcurrentHashMap.java:1822)
at 
org.apache.spark.executor.ExecutorMetricsPoller.onTaskCompletion(ExecutorMetricsPoller.scala:135)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:737)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
22/09/29 09:46:24 INFO MemoryStore: MemoryStore cleared
22/09/29 09:46:24 INFO BlockManager: BlockManager stopped
22/09/29 09:46:24 INFO ShutdownHookManager: Shutdown hook called
22/09/29 09:46:24 INFO ShutdownHookManager: Deleting directory 
/mnt/yarn/usercache/hadoop/appcache/application_1664443624160_0001/spark-93efc2d4-84de-494b-a3b7-2cb1c3a45426
```

I have checked the code and the basic assumption to have at least as many 
`onTaskStart()` calls as `onTaskCompletion()` for the same `stageId` & 
`stageAttemptId` pair is correct. But there is race condition between
`getExecutorUpdates()` and `onTaskStart()`.

First of all we have two different threads:
- task runner: to execute the task and informs `ExecutorMetricsPoller` 
about task starts and completion
- heartbeater: which uses the `ExecutorMetricsPoller` to get the metrics

To show the race condition assume a task just finished which was running on 
its own (no other tasks was running).
So this will decrease the `count` from 1 to 0.

On the task runner thread let say a new task starts. So the execution is in 
the `onTaskStart()` method
let's assume the `countAndPeaks` is already computed and here the counter 
is 0
but the execution is still before incrementing the counter. So we are in 
between the following two lines:

```scala
 val countAndPeaks = stageTCMP.computeIfAbsent((stageId, stageAttemptId),
  _ => TCMP(new AtomicLong(0), new 
AtomicLongArray(ExecutorMetricType.numMetrics)))
 val stageCount = countAndPeaks.count.incrementAndGet()
```

Let's look at the other thread (heartbeater) where the 
`getExecutorUpdates()` is running and it is at the
`removeIfInactive()` method:

```scala
   def removeIfInactive(k: StageKey, v: TCMP): TCMP = {
  if (v.count.get == 0) {
logDebug(s"removing (${k._1}, ${k._2}) from stageTCMP")
null
  } else {
v
  }
}
```

And here this entry is removed from `stageTCMP` as the count is 0.

Let's go back to the task runner thread where we increase the counter to 1 
but that value will be lost
as we have no entry in the `stageTCMP` for this stage and attempt.

So if a new task comes instead of 2 we will have 1 in the `stageTCMP` and 
when those two tasks finishes
the second one will decrease the counter from 0 to -1. This is when the 
assert raised.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test.

I managed to reproduce the issue with a temporary test:

```scala
 test("reproduce assert failure") {
val testMemoryManager = new TestMemoryManag

[spark] branch master updated (08708225417 -> 564a51b64e7)

2022-10-03 Thread attilapiros
This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 08708225417 [SPARK-40509][SS][PYTHON][FOLLLOW-UP] Add example for 
applyInPandasWithState followup
 add 564a51b64e7 [SPARK-40617] Fix race condition at the handling of 
ExecutorMetricsPoller's stageTCMP entries

No new revisions were added by this update.

Summary of changes:
 .../spark/executor/ExecutorMetricsPoller.scala  | 21 -
 .../spark/executor/ExecutorMetricsPollerSuite.scala |  4 ++--
 2 files changed, 14 insertions(+), 11 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (ef837ca -> fcc5176)

2022-01-13 Thread attilapiros
This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from ef837ca  [SPARK-37905][INFRA] Make `merge_spark_pr.py` set primary 
author from the first commit in case of ties
 add fcc5176  [SPARK-36967][CORE] Report accurate shuffle block size if its 
skewed

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/internal/config/package.scala | 24 ++
 .../org/apache/spark/scheduler/MapStatus.scala | 32 ++-
 .../main/scala/org/apache/spark/util/Utils.scala   | 16 
 .../apache/spark/scheduler/MapStatusSuite.scala| 97 ++
 .../execution/adaptive/OptimizeSkewedJoin.scala| 15 +---
 5 files changed, 169 insertions(+), 15 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-36406][CORE] Avoid unnecessary file operations before delete a write failed file held by DiskBlockObjectWriter

2021-12-20 Thread attilapiros
This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6f0b1c9  [SPARK-36406][CORE] Avoid unnecessary file operations before 
delete a write failed file held by DiskBlockObjectWriter
6f0b1c9 is described below

commit 6f0b1c9c9c7aec600358ee15f6d6ac8cf67864e9
Author: yangjie01 
AuthorDate: Mon Dec 20 09:52:38 2021 +0100

[SPARK-36406][CORE] Avoid unnecessary file operations before delete a write 
failed file held by DiskBlockObjectWriter

### What changes were proposed in this pull request?
We always do file truncate operation before delete a write failed file held 
by `DiskBlockObjectWriter`, a typical process is as follows:

```
if (!success) {
  // This code path only happens if an exception was thrown above before we 
set success;
  // close our stuff and let the exception be thrown further
  writer.revertPartialWritesAndClose()
  if (file.exists()) {
if (!file.delete()) {
  logWarning(s"Error deleting ${file}")
}
  }
}
```
The `revertPartialWritesAndClose` method will reverts writes that haven't 
been committed yet,  but it doesn't seem necessary in the current scene.

So this pr add a new method  to `DiskBlockObjectWriter` named 
`closeAndDelete()`,  the new method just revert write metrics and delete the 
write failed file.

### Why are the changes needed?
Avoid unnecessary file operations.

### Does this PR introduce _any_ user-facing change?
Add a new method  to `DiskBlockObjectWriter` named `closeAndDelete().

### How was this patch tested?
Pass the Jenkins or GitHub Action

Closes #33628 from LuciferYang/SPARK-36406.

Authored-by: yangjie01 
Signed-off-by: attilapiros 
---
 .../shuffle/sort/BypassMergeSortShuffleWriter.java |  5 +
 .../spark/storage/DiskBlockObjectWriter.scala  | 26 ++
 .../util/collection/ExternalAppendOnlyMap.scala|  7 +-
 .../spark/util/collection/ExternalSorter.scala |  7 +-
 .../spark/storage/DiskBlockObjectWriterSuite.scala | 19 
 5 files changed, 48 insertions(+), 16 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
index 9a5ac6f..da7a518 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
@@ -289,10 +289,7 @@ final class BypassMergeSortShuffleWriter
   try {
 for (DiskBlockObjectWriter writer : partitionWriters) {
   // This method explicitly does _not_ throw exceptions:
-  File file = writer.revertPartialWritesAndClose();
-  if (!file.delete()) {
-logger.error("Error while deleting file {}", 
file.getAbsolutePath());
-  }
+  writer.closeAndDelete();
 }
   } finally {
 partitionWriters = null;
diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
index 4170609..3bdae2f 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
@@ -19,6 +19,7 @@ package org.apache.spark.storage
 
 import java.io.{BufferedOutputStream, File, FileOutputStream, IOException, 
OutputStream}
 import java.nio.channels.{ClosedByInterruptException, FileChannel}
+import java.nio.file.Files
 import java.util.zip.Checksum
 
 import org.apache.spark.errors.SparkCoreErrors
@@ -119,6 +120,11 @@ private[spark] class DiskBlockObjectWriter(
   private var numRecordsWritten = 0
 
   /**
+   * Keep track the number of written records committed.
+   */
+  private var numRecordsCommitted = 0L
+
+  /**
* Set the checksum that the checksumOutputStream should use
*/
   def setChecksum(checksum: Checksum): Unit = {
@@ -223,6 +229,7 @@ private[spark] class DiskBlockObjectWriter(
   // In certain compression codecs, more bytes are written after streams 
are closed
   writeMetrics.incBytesWritten(committedPosition - reportedPosition)
   reportedPosition = committedPosition
+  numRecordsCommitted += numRecordsWritten
   numRecordsWritten = 0
   fileSegment
 } else {
@@ -273,6 +280,25 @@ private[spark] class DiskBlockObjectWriter(
   }
 
   /**
+   * Reverts write metrics and delete the file held by current 
`DiskBlockObjectWriter`.
+   * Callers should invoke this function when there are runtime exceptions in 
file
+   * writing process an

[spark] branch master updated: [SPARK-35672][CORE][YARN] Pass user classpath entries to executors using config instead of command line

2021-11-16 Thread attilapiros
This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 693537f  [SPARK-35672][CORE][YARN] Pass user classpath entries to 
executors using config instead of command line
693537f is described below

commit 693537f7852b8a3b9f3cf3dc21eb1c132f64db0d
Author: Erik Krogen 
AuthorDate: Tue Nov 16 18:08:48 2021 +0100

[SPARK-35672][CORE][YARN] Pass user classpath entries to executors using 
config instead of command line

### What changes were proposed in this pull request?
Refactor the logic for constructing the user classpath from 
`yarn.ApplicationMaster` into `yarn.Client` so that it can be leveraged on the 
executor side as well, instead of having the driver construct it and pass it to 
the executor via command-line arguments. A new method, `getUserClassPath`, is 
added to `CoarseGrainedExecutorBackend` which defaults to `Nil` (consistent 
with the existing behavior where non-YARN resource managers do not configure 
the user classpath). `YarnCoarseGrained [...]

Please note that this is a re-submission of #32810, which was reverted in 
#34082 due to the issues described in [this 
comment](https://issues.apache.org/jira/browse/SPARK-35672?focusedCommentId=17419285&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17419285).
 This PR additionally includes the changes described in #34084 to resolve the 
issue, though this PR has been enhanced to properly handle escape strings, 
unlike #34084.

### Why are the changes needed?
User-provided JARs are made available to executors using a custom 
classloader, so they do not appear on the standard Java classpath. Instead, 
they are passed as a list to the executor which then creates a classloader out 
of the URLs. Currently in the case of YARN, this list of JARs is crafted by the 
Driver (in `ExecutorRunnable`), which then passes the information to the 
executors (`CoarseGrainedExecutorBackend`) by specifying each JAR on the 
executor command line as `--user-class-pat [...]

> /bin/bash: Argument list too long

A [Google 
search](https://www.google.com/search?q=spark%20%22%2Fbin%2Fbash%3A%20argument%20list%20too%20long%22&oq=spark%20%22%2Fbin%2Fbash%3A%20argument%20list%20too%20long%22)
 indicates that this is not a theoretical problem and afflicts real users, 
including ours. Passing this list using the configurations instead resolves 
this issue.

### Does this PR introduce _any_ user-facing change?
There is one small behavioral change which is a bug fix. Previously the 
`spark.yarn.config.gatewayPath` and `spark.yarn.config.replacementPath` options 
were only applied to executors, meaning they would not work for the driver when 
running in cluster mode. This appears to be a bug; the [documentation for this 
functionality](https://spark.apache.org/docs/latest/running-on-yarn.html) does 
not mention any limitations that this is only for executors. This PR fixes that 
issue.

Additionally, this fixes the main bash argument length issue, allowing for 
larger JAR lists to be passed successfully. Configuration of JARs is identical 
to before, and substitution of environment variables in `spark.jars` or 
`spark.yarn.config.replacementPath` works as expected.

### How was this patch tested?
New unit tests were added in `YarnClusterSuite`. Also, we have been running 
a similar fix internally for 4 months with great success.

Closes #34120 from xkrogen/xkrogen-SPARK-35672-yarn-classpath-list-take2.

Authored-by: Erik Krogen 
Signed-off-by: attilapiros 
---
 .../executor/CoarseGrainedExecutorBackend.scala| 17 ++---
 .../scala/org/apache/spark/executor/Executor.scala |  2 +
 .../CoarseGrainedExecutorBackendSuite.scala| 17 ++---
 .../cluster/k8s/KubernetesExecutorBackend.scala|  2 +-
 .../spark/deploy/yarn/ApplicationMaster.scala  |  9 +--
 .../org/apache/spark/deploy/yarn/Client.scala  | 34 -
 .../spark/deploy/yarn/ExecutorRunnable.scala   | 12 ---
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala| 75 ++-
 .../YarnCoarseGrainedExecutorBackend.scala |  8 +-
 .../org/apache/spark/deploy/yarn/ClientSuite.scala | 35 +
 .../spark/deploy/yarn/YarnClusterSuite.scala   | 86 +++---
 .../deploy/yarn/YarnSparkHadoopUtilSuite.scala | 40 ++
 12 files changed, 280 insertions(+), 57 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 4f63ada..43887a7 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spa

[spark] branch master updated (0bba90b -> c29bb02)

2021-10-18 Thread attilapiros
This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 0bba90b  [SPARK-36978][SQL] InferConstraints rule should create 
IsNotNull constraints on the accessed nested field instead of the root nested 
type
 add c29bb02  [SPARK-36965][PYTHON] Extend python test runner by logging 
out the temp output files

No new revisions were added by this update.

Summary of changes:
 python/run-tests.py | 12 ++--
 1 file changed, 10 insertions(+), 2 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (8e92ef8 -> 03e48c8)

2021-07-13 Thread attilapiros
This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 8e92ef8  [SPARK-35749][SPARK-35773][SQL] Parse unit list interval 
literals as tightest year-month/day-time interval types
 add 03e48c8  [SPARK-35334][K8S] Make Spark more resilient to intermittent 
K8s flakiness

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala | 10 ++
 1 file changed, 10 insertions(+)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (a70e66e -> 4534c0c)

2021-06-07 Thread attilapiros
This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from a70e66e  [SPARK-35665][SQL] Resolve UnresolvedAlias in CollectMetrics
 add 4534c0c  [SPARK-35543][CORE] Fix memory leak in 
BlockManagerMasterEndpoint removeRdd

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/storage/BlockManagerMasterEndpoint.scala| 12 +---
 1 file changed, 9 insertions(+), 3 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (d2a535f -> 8b94eff)

2021-05-10 Thread attilapiros
This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from d2a535f  [SPARK-34246][FOLLOWUP] Change the definition of 
`findTightestCommonType` for backward compatibility
 add 8b94eff  [SPARK-34736][K8S][TESTS] Kubernetes and Minikube version 
upgrade for integration tests

No new revisions were added by this update.

Summary of changes:
 .../kubernetes/integration-tests/README.md |   4 +-
 .../dev/dev-run-integration-tests.sh   |   6 +-
 .../k8s/integrationtest/KubernetesSuite.scala  |   1 +
 .../deploy/k8s/integrationtest/PVTestsSuite.scala  |   2 +-
 .../backend/minikube/Minikube.scala| 113 -
 5 files changed, 52 insertions(+), 74 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (4e3daa5 -> 738cf7f)

2021-04-29 Thread attilapiros
This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 4e3daa5  [SPARK-35254][BUILD] Upgrade SBT to 1.5.1
 add 738cf7f  [SPARK-35009][CORE] Avoid creating multiple python worker 
monitor threads for the same worker and same task context

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/api/python/PythonRunner.scala | 28 --
 1 file changed, 26 insertions(+), 2 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (132cbf0 -> 068b6c8)

2021-04-29 Thread attilapiros
This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 132cbf0  [SPARK-35105][SQL] Support multiple paths for ADD 
FILE/JAR/ARCHIVE commands
 add 068b6c8  [SPARK-35234][CORE] Reserve the format of stage failureMessage

No new revisions were added by this update.

Summary of changes:
 .../scala/org/apache/spark/scheduler/DAGScheduler.scala| 14 +-
 1 file changed, 5 insertions(+), 9 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (71133e1 -> 2cb962b)

2021-04-15 Thread attilapiros
This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 71133e1  [SPARK-35070][SQL] TRANSFORM not support alias in inputs
 add 2cb962b  [MINOR][CORE] Correct the number of started fetch requests in 
log

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/storage/ShuffleBlockFetcherIterator.scala  | 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-34779][CORE] ExecutorMetricsPoller should keep stage entry in stageTCMP until a heartbeat occurs

2021-04-01 Thread attilapiros
This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f03c7c0  [SPARK-34779][CORE] ExecutorMetricsPoller should keep stage 
entry in stageTCMP until a heartbeat occurs
f03c7c0 is described below

commit f03c7c0e9dd6b6b87049b8546460b7f21c086749
Author: Baohe Zhang 
AuthorDate: Fri Apr 2 07:14:18 2021 +0200

[SPARK-34779][CORE] ExecutorMetricsPoller should keep stage entry in 
stageTCMP until a heartbeat occurs

### What changes were proposed in this pull request?
Allow ExecutorMetricsPoller to keep stage entries in stageTCMP until a 
heartbeat occurs even if the entries have task count = 0.

### Why are the changes needed?
This is an improvement.

The current implementation of ExecutorMetricsPoller keeps a map, stageTCMP 
of (stageId, stageAttemptId) to (count of running tasks, executor metric 
peaks). The entry for the stage is removed on task completion if the task count 
decreases to 0. In the case of an executor with a single core, this leads to 
unnecessary removal and insertion of entries for a given stage.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
A new unit test is added.

Closes #31871 from baohe-zhang/SPARK-34779.

Authored-by: Baohe Zhang 
Signed-off-by: “attilapiros” 
---
 .../spark/executor/ExecutorMetricsPoller.scala | 29 -
 .../executor/ExecutorMetricsPollerSuite.scala  | 48 ++
 2 files changed, 67 insertions(+), 10 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala 
b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala
index 1c1a1ca..0cdb306 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorMetricsPoller.scala
@@ -53,10 +53,10 @@ private[spark] class ExecutorMetricsPoller(
 
   type StageKey = (Int, Int)
   // Task Count and Metric Peaks
-  private case class TCMP(count: AtomicLong, peaks: AtomicLongArray)
+  private[executor] case class TCMP(count: AtomicLong, peaks: AtomicLongArray)
 
   // Map of (stageId, stageAttemptId) to (count of running tasks, executor 
metric peaks)
-  private val stageTCMP = new ConcurrentHashMap[StageKey, TCMP]
+  private[executor] val stageTCMP = new ConcurrentHashMap[StageKey, TCMP]
 
   // Map of taskId to executor metric peaks
   private val taskMetricPeaks = new ConcurrentHashMap[Long, AtomicLongArray]
@@ -124,17 +124,12 @@ private[spark] class ExecutorMetricsPoller(
*/
   def onTaskCompletion(taskId: Long, stageId: Int, stageAttemptId: Int): Unit 
= {
 // Decrement the task count.
-// Remove the entry from stageTCMP if the task count reaches zero.
 
 def decrementCount(stage: StageKey, countAndPeaks: TCMP): TCMP = {
   val countValue = countAndPeaks.count.decrementAndGet()
-  if (countValue == 0L) {
-logDebug(s"removing (${stage._1}, ${stage._2}) from stageTCMP")
-null
-  } else {
-logDebug(s"stageTCMP: (${stage._1}, ${stage._2}) -> " + countValue)
-countAndPeaks
-  }
+  assert(countValue >= 0, "task count shouldn't below 0")
+  logDebug(s"stageTCMP: (${stage._1}, ${stage._2}) -> " + countValue)
+  countAndPeaks
 }
 
 stageTCMP.computeIfPresent((stageId, stageAttemptId), decrementCount)
@@ -176,6 +171,20 @@ private[spark] class ExecutorMetricsPoller(
 
 stageTCMP.replaceAll(getUpdateAndResetPeaks)
 
+def removeIfInactive(k: StageKey, v: TCMP): TCMP = {
+  if (v.count.get == 0) {
+logDebug(s"removing (${k._1}, ${k._2}) from stageTCMP")
+null
+  } else {
+v
+  }
+}
+
+// Remove the entry from stageTCMP if the task count reaches zero.
+executorUpdates.foreach { case (k, _) =>
+  stageTCMP.computeIfPresent(k, removeIfInactive)
+}
+
 executorUpdates
   }
 
diff --git 
a/core/src/test/scala/org/apache/spark/executor/ExecutorMetricsPollerSuite.scala
 
b/core/src/test/scala/org/apache/spark/executor/ExecutorMetricsPollerSuite.scala
new file mode 100644
index 000..e471864
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/executor/ExecutorMetricsPollerSuite.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at

[spark-website] branch asf-site updated: Promoting SSH remotes and document their purpose, mentioning GitBox

2021-03-19 Thread attilapiros
This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/spark-website.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new c7e0d8e  Promoting SSH remotes and document their purpose, mentioning 
GitBox
c7e0d8e is described below

commit c7e0d8e60838cefabf5b06f45e32abb64612db01
Author: attilapiros 
AuthorDate: Fri Mar 19 19:16:34 2021 +0100

Promoting SSH remotes and document their purpose, mentioning GitBox

Author: attilapiros 

Closes #329 from attilapiros/update_committer_page.
---
 committers.md| 25 ++---
 site/committers.html | 29 -
 2 files changed, 30 insertions(+), 24 deletions(-)

diff --git a/committers.md b/committers.md
index 88c8fb5..529fdfe 100644
--- a/committers.md
+++ b/committers.md
@@ -170,20 +170,21 @@ To use the `merge_spark_pr.py` script described below, you
 will need to add a git remote called `apache` at 
`https://github.com/apache/spark`, 
 as well as one called `apache-github` at `git://github.com/apache/spark`.
 
-You will likely also have a remote `origin` pointing to your fork of Spark, and
-`upstream` pointing to the `apache/spark` GitHub repo. 
+The `apache` (the default value of `PUSH_REMOTE_NAME` environment variable) is 
the remote used for pushing the squashed commits
+and `apache-github` (default value of `PR_REMOTE_NAME`) is the remote used for 
pulling the changes.
+By using two separate remotes for these two actions the result of the 
`merge_spark_pr.py` can be tested without pushing it
+into the official Spark repo just by specifying your fork in the 
`PUSH_REMOTE_NAME` variable.
 
-If correct, your `git remote -v` should look like:
+After cloning your fork of Spark you already have a remote `origin` pointing 
there. So if correct, your `git remote -v`
+contains at least these lines:
 
 ```
-apache https://github.com/apache/spark.git (fetch)
-apache https://github.com/apache/spark.git (push)
-apache-github  git://github.com/apache/spark (fetch)
-apache-github  git://github.com/apache/spark (push)
-origin https://github.com/[your username]/spark.git (fetch)
-origin https://github.com/[your username]/spark.git (push)
-upstream   https://github.com/apache/spark.git (fetch)
-upstream   https://github.com/apache/spark.git (push)
+apache g...@github.com:apache/spark-website.git (fetch)
+apache g...@github.com:apache/spark-website.git (push)
+apache-github  g...@github.com:apache/spark-website.git (fetch)
+apache-github  g...@github.com:apache/spark-website.git (push)
+origin g...@github.com:[your username]/spark-website.git (fetch)
+origin g...@github.com:[your username]/spark-website.git (push)
 ```
 
 For the `apache` repo, you will need to set up command-line authentication to 
GitHub. This may
@@ -192,6 +193,8 @@ include setting up an SSH key and/or personal access token. 
See:
 - https://help.github.com/articles/connecting-to-github-with-ssh/
 - 
https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/
 
+To check whether the necessary write access are already granted please visit 
[GitBox](https://gitbox.apache.org/setup/).
+
 Ask `d...@spark.apache.org` if you have trouble with these steps, or want help 
doing your first merge.
 
 Merge Script
diff --git a/site/committers.html b/site/committers.html
index 9cc9d97..7760439 100644
--- a/site/committers.html
+++ b/site/committers.html
@@ -618,19 +618,20 @@ it. So please don’t add any test commits or 
anything like that, only real
 will need to add a git remote called apache at https://github.com/apache/spark, 
 as well as one called apache-github at git://github.com/apache/spark.
 
-You will likely also have a remote origin pointing to your fork of Spark, and
-upstream pointing to 
the apache/spark 
GitHub repo.
-
-If correct, your git 
remote -v should look like:
-
-apache   https://github.com/apache/spark.git (fetch)
-apache https://github.com/apache/spark.git (push)
-apache-github  git://github.com/apache/spark (fetch)
-apache-github  git://github.com/apache/spark (push)
-origin https://github.com/[your username]/spark.git (fetch)
-origin https://github.com/[your username]/spark.git (push)
-upstream   https://github.com/apache/spark.git (fetch)
-upstream   https://github.com/apache/spark.git (push)
+The apache (the 
default value of PUSH_REMOTE_NAME environment variable) is the remote 
used for pushing the squashed commits
+and apache-github 
(default value of PR_REMOTE_NAME) is the remote used for pulling the 
changes.
+By using two separate remotes for these two actions the result of the merge_spark_pr.py can be 
tested without pushing it
+into the official Spark repo just by specifying your fork in the PUSH_REMOTE_NAME 
variable.
+
+After cloning your fork of Spark you already have a remote origin pointing there. So 
if correct, your git remote 
-v

[spark-website] branch asf-site updated: Add Attila Zsolt Piros to committers

2021-03-19 Thread attilapiros
This is an automated email from the ASF dual-hosted git repository.

attilapiros pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/spark-website.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new fc65ac1  Add Attila Zsolt Piros to committers
fc65ac1 is described below

commit fc65ac1f9dbc3cb25fc58703f74aefb51a157c32
Author: attilapiros 
AuthorDate: Fri Mar 19 12:25:25 2021 +0100

Add Attila Zsolt Piros to committers

Author: attilapiros 

Closes #328 from attilapiros/add_committer.
---
 committers.md| 1 +
 site/committers.html | 4 
 2 files changed, 5 insertions(+)

diff --git a/committers.md b/committers.md
index ae84112..88c8fb5 100644
--- a/committers.md
+++ b/committers.md
@@ -62,6 +62,7 @@ navigation:
 |Sean Owen|Databricks|
 |Tejas Patil|Facebook|
 |Nick Pentreath|IBM|
+|Attila Zsolt Piros|Cloudera|
 |Anirudh Ramanathan|Rockset|
 |Imran Rashid|Cloudera|
 |Charles Reiss|University of Virginia|
diff --git a/site/committers.html b/site/committers.html
index 5985718..9cc9d97 100644
--- a/site/committers.html
+++ b/site/committers.html
@@ -419,6 +419,10 @@
   IBM
 
 
+  Attila Zsolt Piros
+  Cloudera
+
+
   Anirudh Ramanathan
   Rockset
 

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org