[spark] branch master updated: [SPARK-27369][CORE] Setup resources when Standalone Worker starts up

2019-06-26 Thread jiangxb1987
This is an automated email from the ASF dual-hosted git repository.

jiangxb1987 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7cbe01e  [SPARK-27369][CORE] Setup resources when Standalone Worker 
starts up
7cbe01e is described below

commit 7cbe01e8efc3f6cd3a0cac4bcfadea8fcc74a955
Author: wuyi 
AuthorDate: Wed Jun 26 19:19:00 2019 -0700

[SPARK-27369][CORE] Setup resources when Standalone Worker starts up

## What changes were proposed in this pull request?

To support GPU-aware scheduling in Standalone (cluster mode), Worker should 
have ability to setup resources(e.g. GPU/FPGA) when it starts up.

Similar as driver/executor do, Worker has two ways(resourceFile & 
resourceDiscoveryScript) to setup resources when it starts up.  User could use 
`SPARK_WORKER_OPTS` to apply resource configs on Worker in the form of "-Dx=y". 
For example,
```
SPARK_WORKER_OPTS="-Dspark.worker.resource.gpu.amount=2 \
   -Dspark.worker.resource.fpga.amount=1 \
   
-Dspark.worker.resource.fpga.discoveryScript=/Users/wuyi/tmp/getFPGAResources.sh
 \
   
-Dspark.worker.resourcesFile=/Users/wuyi/tmp/worker-resource-file"
 ```
## How was this patch tested?

Tested manually in Standalone locally:

- Worker could start up normally when no resources are configured

- Worker should fail to start up when exception threw during setup 
resources(e.g. unknown directory, parse fail)

- Worker could setup resources from resource file

- Worker could setup resources from discovery scripts

- Worker should setup resources from resource file & discovery scripts when 
both are configure.

Closes #24841 from Ngone51/dev-worker-resources-setup.

Authored-by: wuyi 
Signed-off-by: Xingbo Jiang 
---
 .../org/apache/spark/deploy/worker/Worker.scala| 25 +++---
 .../org/apache/spark/internal/config/Worker.scala  | 11 ++
 .../apache/spark/deploy/worker/WorkerSuite.scala   |  2 +-
 3 files changed, 34 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index b432feb..ac7a1b9 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -41,6 +41,8 @@ import org.apache.spark.internal.config.Tests.IS_TESTING
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.internal.config.Worker._
 import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances}
+import org.apache.spark.resource.ResourceInformation
+import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.rpc._
 import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, 
Utils}
 
@@ -54,6 +56,7 @@ private[deploy] class Worker(
 workDirPath: String = null,
 val conf: SparkConf,
 val securityMgr: SecurityManager,
+resourceFileOpt: Option[String] = None,
 externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null)
   extends ThreadSafeRpcEndpoint with Logging {
 
@@ -176,6 +179,9 @@ private[deploy] class Worker(
 masterRpcAddresses.length // Make sure we can register with all masters at 
the same time
   )
 
+  // visible for tests
+  private[deploy] var resources: Map[String, ResourceInformation] = _
+
   var coresUsed = 0
   var memoryUsed = 0
 
@@ -208,6 +214,7 @@ private[deploy] class Worker(
 logInfo("Spark home: " + sparkHome)
 createWorkDir()
 startExternalShuffleService()
+setupWorkerResources()
 webUi = new WorkerWebUI(this, workDir, webUiPort)
 webUi.bind()
 
@@ -220,6 +227,16 @@ private[deploy] class Worker(
 metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
   }
 
+  private def setupWorkerResources(): Unit = {
+try {
+  resources = getOrDiscoverAllResources(conf, SPARK_WORKER_PREFIX, 
resourceFileOpt)
+} catch {
+  case e: Exception =>
+logError("Failed to setup worker resources: ", e)
+System.exit(1)
+}
+  }
+
   /**
* Change to use the new master.
*
@@ -785,7 +802,8 @@ private[deploy] object Worker extends Logging {
 val conf = new SparkConf
 val args = new WorkerArguments(argStrings, conf)
 val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, 
args.cores,
-  args.memory, args.masters, args.workDir, conf = conf)
+  args.memory, args.masters, args.workDir, conf = conf,
+  resourceFileOpt = conf.get(SPARK_WORKER_RESOURCE_FILE))
 // With external shuffle service enabled, if we request to launch multiple 
workers on one host,
 // we can only successfully launch the first worker and the rest fails, 
because with the port
 // bound, we may 

[spark] branch master updated: [SPARK-27992][PYTHON] Allow Python to join with connection thread to propagate errors

2019-06-26 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c277afb  [SPARK-27992][PYTHON] Allow Python to join with connection 
thread to propagate errors
c277afb is described below

commit c277afb12b61a91272568dd46380c0d0a9958989
Author: Bryan Cutler 
AuthorDate: Wed Jun 26 13:05:41 2019 -0700

[SPARK-27992][PYTHON] Allow Python to join with connection thread to 
propagate errors

## What changes were proposed in this pull request?

Currently with `toLocalIterator()` and `toPandas()` with Arrow enabled, if 
the Spark job being run in the background serving thread errors, it will be 
caught and sent to Python through the PySpark serializer.
This is not the ideal solution because it is only catch a SparkException, 
it won't handle an error that occurs in the serializer, and each method has to 
have it's own special handling to propagate the error.

This PR instead returns the Python Server object along with the serving 
port and authentication info, so that it allows the Python caller to join with 
the serving thread. During the call to join, the serving thread Future is 
completed either successfully or with an exception. In the latter case, the 
exception will be propagated to Python through the Py4j call.

## How was this patch tested?

Existing tests

Closes #24834 from BryanCutler/pyspark-propagate-server-error-SPARK-27992.

Authored-by: Bryan Cutler 
Signed-off-by: Bryan Cutler 
---
 .../org/apache/spark/api/python/PythonRDD.scala| 90 +++--
 .../main/scala/org/apache/spark/api/r/RRDD.scala   |  4 +-
 .../apache/spark/security/SocketAuthHelper.scala   | 19 +
 .../apache/spark/security/SocketAuthServer.scala   | 94 +++---
 .../main/scala/org/apache/spark/util/Utils.scala   |  4 +-
 python/pyspark/rdd.py  | 26 --
 python/pyspark/sql/dataframe.py| 10 ++-
 python/pyspark/sql/tests/test_arrow.py |  2 +-
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 38 -
 9 files changed, 161 insertions(+), 126 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index fe25c3a..5b80e14 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -38,7 +38,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.BUFFER_SIZE
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
-import org.apache.spark.security.{SocketAuthHelper, SocketAuthServer}
+import org.apache.spark.security.{SocketAuthHelper, SocketAuthServer, 
SocketFuncServer}
 import org.apache.spark.util._
 
 
@@ -137,8 +137,9 @@ private[spark] object PythonRDD extends Logging {
* (effectively a collect()), but allows you to run on a certain subset of 
partitions,
* or to enable local execution.
*
-   * @return 2-tuple (as a Java array) with the port number of a local socket 
which serves the
-   * data collected from this job, and the secret for authentication.
+   * @return 3-tuple (as a Java array) with the port number of a local socket 
which serves the
+   * data collected from this job, the secret for authentication, and 
a socket auth
+   * server object that can be used to join the JVM serving thread in 
Python.
*/
   def runJob(
   sc: SparkContext,
@@ -156,8 +157,9 @@ private[spark] object PythonRDD extends Logging {
   /**
* A helper function to collect an RDD as an iterator, then serve it via 
socket.
*
-   * @return 2-tuple (as a Java array) with the port number of a local socket 
which serves the
-   * data collected from this job, and the secret for authentication.
+   * @return 3-tuple (as a Java array) with the port number of a local socket 
which serves the
+   * data collected from this job, the secret for authentication, and 
a socket auth
+   * server object that can be used to join the JVM serving thread in 
Python.
*/
   def collectAndServe[T](rdd: RDD[T]): Array[Any] = {
 serveIterator(rdd.collect().iterator, s"serve RDD ${rdd.id}")
@@ -168,58 +170,59 @@ private[spark] object PythonRDD extends Logging {
* are collected as separate jobs, by order of index. Partition data is 
first requested by a
* non-zero integer to start a collection job. The response is prefaced by 
an integer with 1
* meaning partition data will be served, 0 meaning the local iterator has 
been consumed,
-   * and -1 meaining an error occurred during collection. This function is 
used by
+   * and -1 meaning an error occurred during collection. This function i

[spark] branch master updated: [SPARK-28157][CORE] Make SHS clear KVStore `LogInfo`s for the blacklisted entries

2019-06-26 Thread dbtsai
This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7eeca02  [SPARK-28157][CORE] Make SHS clear KVStore `LogInfo`s for the 
blacklisted entries
7eeca02 is described below

commit 7eeca029404c8cc1e2c3e7ae8728b90582e25d76
Author: Dongjoon Hyun 
AuthorDate: Wed Jun 26 18:56:06 2019 +

[SPARK-28157][CORE] Make SHS clear KVStore `LogInfo`s for the blacklisted 
entries

## What changes were proposed in this pull request?

At Spark 2.4.0/2.3.2/2.2.3, 
[SPARK-24948](https://issues.apache.org/jira/browse/SPARK-24948) delegated 
access permission checks to the file system, and maintains a blacklist for all 
event log files failed once at reading. The blacklisted log files are released 
back after `CLEAN_INTERVAL_S` seconds.

However, the released files whose sizes don't changes are ignored forever 
due to `info.fileSize < entry.getLen()` condition (previously 
[here](https://github.com/apache/spark/commit/3c96937c7b1d7a010b630f4b98fd22dafc37808b#diff-a7befb99e7bd7e3ab5c46c2568aa5b3eR454)
 and now at 
[shouldReloadLog](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L571))
 which returns `false` always when the size is the same with the exi [...]

This PR aims to remove the existing entry from `KVStore` when it goes to 
the blacklist.

## How was this patch tested?

Pass the Jenkins with the updated test case.

Closes #24966 from dongjoon-hyun/SPARK-28157.

Authored-by: Dongjoon Hyun 
Signed-off-by: DB Tsai 
---
 .../apache/spark/deploy/history/FsHistoryProvider.scala   |  3 +++
 .../spark/deploy/history/FsHistoryProviderSuite.scala | 15 ++-
 2 files changed, 13 insertions(+), 5 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 98265ff..f2ee599 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -536,6 +536,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
 // We don't have read permissions on the log file
 logWarning(s"Unable to read log $path", e.getCause)
 blacklist(path)
+// SPARK-28157 We should remove this blacklisted entry from the 
KVStore
+// to handle permission-only changes with the same file sizes 
later.
+listing.delete(classOf[LogInfo], path.toString)
   case e: Exception =>
 logError("Exception while merging application listings", e)
 } finally {
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 791814b..571c6e3 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -1122,17 +1122,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
Matchers with Logging {
 writeFile(accessGranted, true, None,
   SparkListenerApplicationStart("accessGranted", Some("accessGranted"), 
1L, "test", None),
   SparkListenerApplicationEnd(5L))
+var isReadable = false
 val mockedFs = spy(provider.fs)
 doThrow(new AccessControlException("Cannot read accessDenied 
file")).when(mockedFs).open(
-  argThat((path: Path) => path.getName.toLowerCase(Locale.ROOT) == 
"accessdenied"))
+  argThat((path: Path) => path.getName.toLowerCase(Locale.ROOT) == 
"accessdenied" &&
+!isReadable))
 val mockedProvider = spy(provider)
 when(mockedProvider.fs).thenReturn(mockedFs)
 updateAndCheck(mockedProvider) { list =>
   list.size should be(1)
 }
-writeFile(accessDenied, true, None,
-  SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, 
"test", None),
-  SparkListenerApplicationEnd(5L))
 // Doing 2 times in order to check the blacklist filter too
 updateAndCheck(mockedProvider) { list =>
   list.size should be(1)
@@ -1140,8 +1139,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with 
Matchers with Logging {
 val accessDeniedPath = new Path(accessDenied.getPath)
 assert(mockedProvider.isBlacklisted(accessDeniedPath))
 clock.advance(24 * 60 * 60 * 1000 + 1) // add a bit more than 1d
+isReadable = true
 mockedProvider.cleanLogs()
-assert(!mockedProvider.isBlacklisted(accessDeniedPath))
+updateAndCheck(mockedProvider) { list =>
+  assert(!mockedProvider.isBlacklisted(accessDeniedPath))
+  assert(

[spark] branch branch-2.3 updated: [SPARK-28164] Fix usage description of `start-slave.sh`

2019-06-26 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
 new a054a00  [SPARK-28164] Fix usage description of `start-slave.sh`
a054a00 is described below

commit a054a00908119a380477f225f8d0cb897169adea
Author: shivusondur 
AuthorDate: Wed Jun 26 12:42:33 2019 -0500

[SPARK-28164] Fix usage description of `start-slave.sh`

## What changes were proposed in this pull request?

updated the usage message in sbin/start-slave.sh.
 argument moved to first

## How was this patch tested?
tested locally with
Starting master
starting slave with (./start-slave.sh spark://: -c 1
and opening spark shell with ./spark-shell --master spark://:

Closes #24974 from shivusondur/jira28164.

Authored-by: shivusondur 
Signed-off-by: Sean Owen 
(cherry picked from commit bd232b98b470a609472a4ea8df912f8fad06ba06)
Signed-off-by: Sean Owen 
---
 sbin/start-slave.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/sbin/start-slave.sh b/sbin/start-slave.sh
index 8c268b8..4e57f74 100755
--- a/sbin/start-slave.sh
+++ b/sbin/start-slave.sh
@@ -40,7 +40,7 @@ fi
 CLASS="org.apache.spark.deploy.worker.Worker"
 
 if [[ $# -lt 1 ]] || [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
-  echo "Usage: ./sbin/start-slave.sh [options] "
+  echo "Usage: ./sbin/start-slave.sh  [options]"
   pattern="Usage:"
   pattern+="\|Using Spark's default log4j profile:"
   pattern+="\|Registered signal handlers for"


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-28164] Fix usage description of `start-slave.sh`

2019-06-26 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new bd232b9  [SPARK-28164] Fix usage description of `start-slave.sh`
bd232b9 is described below

commit bd232b98b470a609472a4ea8df912f8fad06ba06
Author: shivusondur 
AuthorDate: Wed Jun 26 12:42:33 2019 -0500

[SPARK-28164] Fix usage description of `start-slave.sh`

## What changes were proposed in this pull request?

updated the usage message in sbin/start-slave.sh.
 argument moved to first

## How was this patch tested?
tested locally with
Starting master
starting slave with (./start-slave.sh spark://: -c 1
and opening spark shell with ./spark-shell --master spark://:

Closes #24974 from shivusondur/jira28164.

Authored-by: shivusondur 
Signed-off-by: Sean Owen 
---
 sbin/start-slave.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/sbin/start-slave.sh b/sbin/start-slave.sh
index 247c9e2..2cb17a0 100755
--- a/sbin/start-slave.sh
+++ b/sbin/start-slave.sh
@@ -40,7 +40,7 @@ fi
 CLASS="org.apache.spark.deploy.worker.Worker"
 
 if [[ $# -lt 1 ]] || [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
-  echo "Usage: ./sbin/start-slave.sh [options] "
+  echo "Usage: ./sbin/start-slave.sh  [options]"
   pattern="Usage:"
   pattern+="\|Using Spark's default log4j profile:"
   pattern+="\|Started daemon with process name"


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-28164] Fix usage description of `start-slave.sh`

2019-06-26 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new eb66d3b  [SPARK-28164] Fix usage description of `start-slave.sh`
eb66d3b is described below

commit eb66d3b222d2eb6738c1242508b121176370617b
Author: shivusondur 
AuthorDate: Wed Jun 26 12:42:33 2019 -0500

[SPARK-28164] Fix usage description of `start-slave.sh`

## What changes were proposed in this pull request?

updated the usage message in sbin/start-slave.sh.
 argument moved to first

## How was this patch tested?
tested locally with
Starting master
starting slave with (./start-slave.sh spark://: -c 1
and opening spark shell with ./spark-shell --master spark://:

Closes #24974 from shivusondur/jira28164.

Authored-by: shivusondur 
Signed-off-by: Sean Owen 
(cherry picked from commit bd232b98b470a609472a4ea8df912f8fad06ba06)
Signed-off-by: Sean Owen 
---
 sbin/start-slave.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/sbin/start-slave.sh b/sbin/start-slave.sh
index 8c268b8..4e57f74 100755
--- a/sbin/start-slave.sh
+++ b/sbin/start-slave.sh
@@ -40,7 +40,7 @@ fi
 CLASS="org.apache.spark.deploy.worker.Worker"
 
 if [[ $# -lt 1 ]] || [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
-  echo "Usage: ./sbin/start-slave.sh [options] "
+  echo "Usage: ./sbin/start-slave.sh  [options]"
   pattern="Usage:"
   pattern+="\|Using Spark's default log4j profile:"
   pattern+="\|Registered signal handlers for"


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-28005][YARN] Remove unnecessary log from SparkRackResolver

2019-06-26 Thread irashid
This is an automated email from the ASF dual-hosted git repository.

irashid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8313015  [SPARK-28005][YARN] Remove unnecessary log from 
SparkRackResolver
8313015 is described below

commit 8313015e8dd11aafc5b887773e467892d15134de
Author: Gabor Somogyi 
AuthorDate: Wed Jun 26 09:50:54 2019 -0500

[SPARK-28005][YARN] Remove unnecessary log from SparkRackResolver

## What changes were proposed in this pull request?

SparkRackResolver generates an INFO message every time is called with 0 
arguments.
In this PR I've deleted it because it's too verbose.

## How was this patch tested?

Existing unit tests + spark-shell.

Closes #24935 from gaborgsomogyi/SPARK-28005.

Authored-by: Gabor Somogyi 
Signed-off-by: Imran Rashid 
---
 .../main/scala/org/apache/spark/deploy/yarn/SparkRackResolver.scala| 3 +++
 1 file changed, 3 insertions(+)

diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/SparkRackResolver.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/SparkRackResolver.scala
index cab3272..51be932 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/SparkRackResolver.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/SparkRackResolver.scala
@@ -67,6 +67,9 @@ private[spark] class SparkRackResolver(conf: Configuration) 
extends Logging {
   }
 
   private def coreResolve(hostNames: Seq[String]): Seq[Node] = {
+if (hostNames.isEmpty) {
+  return Seq.empty
+}
 val nodes = new ArrayBuffer[Node]
 // dnsToSwitchMapping is thread-safe
 val rNameList = dnsToSwitchMapping.resolve(hostNames.toList.asJava).asScala


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-27100][SQL][2.4] Use `Array` instead of `Seq` in `FilePartition` to prevent StackOverflowError

2019-06-26 Thread dbtsai
This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 680c1b6  [SPARK-27100][SQL][2.4] Use `Array` instead of `Seq` in 
`FilePartition` to prevent StackOverflowError
680c1b6 is described below

commit 680c1b6aa4ca7e4f8abb9261b8566b6f82e05d96
Author: Parth Chandra 
AuthorDate: Wed Jun 26 07:48:27 2019 +

[SPARK-27100][SQL][2.4] Use `Array` instead of `Seq` in `FilePartition` to 
prevent StackOverflowError

… prevent `StackOverflowError `

ShuffleMapTask's partition field is a FilePartition and FilePartition's 
'files' field is a Stream$cons which is essentially a linked list. It is 
therefore serialized recursively.
If the number of files in each partition is, say, 1 files, recursing 
into a linked list of length 1 overflows the stack

The problem is only in Bucketed partitions. The corresponding 
implementation for non Bucketed partitions uses a StreamBuffer. The proposed 
change applies the same for Bucketed partitions.

Existing unit tests. Added new unit test. The unit test fails without the 
patch. Manual testing on dataset used to reproduce the problem.

Closes #24957 from parthchandra/branch-2.4.

Authored-by: Parth Chandra 
Signed-off-by: DB Tsai 
---
 .../spark/sql/execution/DataSourceScanExec.scala   | 14 +++---
 .../sql/execution/datasources/FileScanRDD.scala|  2 +-
 .../datasources/FileSourceStrategySuite.scala  |  2 +-
 .../spark/sql/sources/BucketedReadSuite.scala  | 50 ++
 4 files changed, 59 insertions(+), 9 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 5433c30..d22fe64 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -187,14 +187,14 @@ case class FileSourceScanExec(
 
   private var metadataTime = 0L
 
-  @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
+  @transient private lazy val selectedPartitions: Array[PartitionDirectory] = {
 val optimizerMetadataTimeNs = 
relation.location.metadataOpsTimeNs.getOrElse(0L)
 val startTime = System.nanoTime()
 val ret = relation.location.listFiles(partitionFilters, dataFilters)
 val timeTakenMs = ((System.nanoTime() - startTime) + 
optimizerMetadataTimeNs) / 1000 / 1000
 metadataTime = timeTakenMs
 ret
-  }
+  }.toArray
 
   override lazy val (outputPartitioning, outputOrdering): (Partitioning, 
Seq[SortOrder]) = {
 val bucketSpec = if 
(relation.sparkSession.sessionState.conf.bucketingEnabled) {
@@ -377,7 +377,7 @@ case class FileSourceScanExec(
   private def createBucketedReadRDD(
   bucketSpec: BucketSpec,
   readFile: (PartitionedFile) => Iterator[InternalRow],
-  selectedPartitions: Seq[PartitionDirectory],
+  selectedPartitions: Array[PartitionDirectory],
   fsRelation: HadoopFsRelation): RDD[InternalRow] = {
 logInfo(s"Planning with ${bucketSpec.numBuckets} buckets")
 val filesGroupedToBuckets =
@@ -402,7 +402,7 @@ case class FileSourceScanExec(
 }
 
 val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId =>
-  FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, 
Nil))
+  FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, 
Array.empty))
 }
 
 new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions)
@@ -418,7 +418,7 @@ case class FileSourceScanExec(
*/
   private def createNonBucketedReadRDD(
   readFile: (PartitionedFile) => Iterator[InternalRow],
-  selectedPartitions: Seq[PartitionDirectory],
+  selectedPartitions: Array[PartitionDirectory],
   fsRelation: HadoopFsRelation): RDD[InternalRow] = {
 val defaultMaxSplitBytes =
   fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
@@ -449,7 +449,7 @@ case class FileSourceScanExec(
 partition.values, file.getPath.toUri.toString, 0, file.getLen, 
hosts))
 }
   }
-}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
+}.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
 
 val partitions = new ArrayBuffer[FilePartition]
 val currentFiles = new ArrayBuffer[PartitionedFile]
@@ -461,7 +461,7 @@ case class FileSourceScanExec(
 val newPartition =
   FilePartition(
 partitions.size,
-currentFiles.toArray.toSeq) // Copy to a new Array.
+currentFiles.toArray) // Copy to a new Array.
 partitions += newPartition
   }
   currentFiles.clear()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/ex