[spark] branch master updated: [SPARK-27369][CORE] Setup resources when Standalone Worker starts up
This is an automated email from the ASF dual-hosted git repository. jiangxb1987 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7cbe01e [SPARK-27369][CORE] Setup resources when Standalone Worker starts up 7cbe01e is described below commit 7cbe01e8efc3f6cd3a0cac4bcfadea8fcc74a955 Author: wuyi AuthorDate: Wed Jun 26 19:19:00 2019 -0700 [SPARK-27369][CORE] Setup resources when Standalone Worker starts up ## What changes were proposed in this pull request? To support GPU-aware scheduling in Standalone (cluster mode), Worker should have ability to setup resources(e.g. GPU/FPGA) when it starts up. Similar as driver/executor do, Worker has two ways(resourceFile & resourceDiscoveryScript) to setup resources when it starts up. User could use `SPARK_WORKER_OPTS` to apply resource configs on Worker in the form of "-Dx=y". For example, ``` SPARK_WORKER_OPTS="-Dspark.worker.resource.gpu.amount=2 \ -Dspark.worker.resource.fpga.amount=1 \ -Dspark.worker.resource.fpga.discoveryScript=/Users/wuyi/tmp/getFPGAResources.sh \ -Dspark.worker.resourcesFile=/Users/wuyi/tmp/worker-resource-file" ``` ## How was this patch tested? Tested manually in Standalone locally: - Worker could start up normally when no resources are configured - Worker should fail to start up when exception threw during setup resources(e.g. unknown directory, parse fail) - Worker could setup resources from resource file - Worker could setup resources from discovery scripts - Worker should setup resources from resource file & discovery scripts when both are configure. Closes #24841 from Ngone51/dev-worker-resources-setup. Authored-by: wuyi Signed-off-by: Xingbo Jiang --- .../org/apache/spark/deploy/worker/Worker.scala| 25 +++--- .../org/apache/spark/internal/config/Worker.scala | 11 ++ .../apache/spark/deploy/worker/WorkerSuite.scala | 2 +- 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index b432feb..ac7a1b9 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -41,6 +41,8 @@ import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.internal.config.UI._ import org.apache.spark.internal.config.Worker._ import org.apache.spark.metrics.{MetricsSystem, MetricsSystemInstances} +import org.apache.spark.resource.ResourceInformation +import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.rpc._ import org.apache.spark.util.{SparkUncaughtExceptionHandler, ThreadUtils, Utils} @@ -54,6 +56,7 @@ private[deploy] class Worker( workDirPath: String = null, val conf: SparkConf, val securityMgr: SecurityManager, +resourceFileOpt: Option[String] = None, externalShuffleServiceSupplier: Supplier[ExternalShuffleService] = null) extends ThreadSafeRpcEndpoint with Logging { @@ -176,6 +179,9 @@ private[deploy] class Worker( masterRpcAddresses.length // Make sure we can register with all masters at the same time ) + // visible for tests + private[deploy] var resources: Map[String, ResourceInformation] = _ + var coresUsed = 0 var memoryUsed = 0 @@ -208,6 +214,7 @@ private[deploy] class Worker( logInfo("Spark home: " + sparkHome) createWorkDir() startExternalShuffleService() +setupWorkerResources() webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() @@ -220,6 +227,16 @@ private[deploy] class Worker( metricsSystem.getServletHandlers.foreach(webUi.attachHandler) } + private def setupWorkerResources(): Unit = { +try { + resources = getOrDiscoverAllResources(conf, SPARK_WORKER_PREFIX, resourceFileOpt) +} catch { + case e: Exception => +logError("Failed to setup worker resources: ", e) +System.exit(1) +} + } + /** * Change to use the new master. * @@ -785,7 +802,8 @@ private[deploy] object Worker extends Logging { val conf = new SparkConf val args = new WorkerArguments(argStrings, conf) val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores, - args.memory, args.masters, args.workDir, conf = conf) + args.memory, args.masters, args.workDir, conf = conf, + resourceFileOpt = conf.get(SPARK_WORKER_RESOURCE_FILE)) // With external shuffle service enabled, if we request to launch multiple workers on one host, // we can only successfully launch the first worker and the rest fails, because with the port // bound, we may
[spark] branch master updated: [SPARK-27992][PYTHON] Allow Python to join with connection thread to propagate errors
This is an automated email from the ASF dual-hosted git repository. cutlerb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c277afb [SPARK-27992][PYTHON] Allow Python to join with connection thread to propagate errors c277afb is described below commit c277afb12b61a91272568dd46380c0d0a9958989 Author: Bryan Cutler AuthorDate: Wed Jun 26 13:05:41 2019 -0700 [SPARK-27992][PYTHON] Allow Python to join with connection thread to propagate errors ## What changes were proposed in this pull request? Currently with `toLocalIterator()` and `toPandas()` with Arrow enabled, if the Spark job being run in the background serving thread errors, it will be caught and sent to Python through the PySpark serializer. This is not the ideal solution because it is only catch a SparkException, it won't handle an error that occurs in the serializer, and each method has to have it's own special handling to propagate the error. This PR instead returns the Python Server object along with the serving port and authentication info, so that it allows the Python caller to join with the serving thread. During the call to join, the serving thread Future is completed either successfully or with an exception. In the latter case, the exception will be propagated to Python through the Py4j call. ## How was this patch tested? Existing tests Closes #24834 from BryanCutler/pyspark-propagate-server-error-SPARK-27992. Authored-by: Bryan Cutler Signed-off-by: Bryan Cutler --- .../org/apache/spark/api/python/PythonRDD.scala| 90 +++-- .../main/scala/org/apache/spark/api/r/RRDD.scala | 4 +- .../apache/spark/security/SocketAuthHelper.scala | 19 + .../apache/spark/security/SocketAuthServer.scala | 94 +++--- .../main/scala/org/apache/spark/util/Utils.scala | 4 +- python/pyspark/rdd.py | 26 -- python/pyspark/sql/dataframe.py| 10 ++- python/pyspark/sql/tests/test_arrow.py | 2 +- .../main/scala/org/apache/spark/sql/Dataset.scala | 38 - 9 files changed, 161 insertions(+), 126 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index fe25c3a..5b80e14 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -38,7 +38,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.BUFFER_SIZE import org.apache.spark.network.util.JavaUtils import org.apache.spark.rdd.RDD -import org.apache.spark.security.{SocketAuthHelper, SocketAuthServer} +import org.apache.spark.security.{SocketAuthHelper, SocketAuthServer, SocketFuncServer} import org.apache.spark.util._ @@ -137,8 +137,9 @@ private[spark] object PythonRDD extends Logging { * (effectively a collect()), but allows you to run on a certain subset of partitions, * or to enable local execution. * - * @return 2-tuple (as a Java array) with the port number of a local socket which serves the - * data collected from this job, and the secret for authentication. + * @return 3-tuple (as a Java array) with the port number of a local socket which serves the + * data collected from this job, the secret for authentication, and a socket auth + * server object that can be used to join the JVM serving thread in Python. */ def runJob( sc: SparkContext, @@ -156,8 +157,9 @@ private[spark] object PythonRDD extends Logging { /** * A helper function to collect an RDD as an iterator, then serve it via socket. * - * @return 2-tuple (as a Java array) with the port number of a local socket which serves the - * data collected from this job, and the secret for authentication. + * @return 3-tuple (as a Java array) with the port number of a local socket which serves the + * data collected from this job, the secret for authentication, and a socket auth + * server object that can be used to join the JVM serving thread in Python. */ def collectAndServe[T](rdd: RDD[T]): Array[Any] = { serveIterator(rdd.collect().iterator, s"serve RDD ${rdd.id}") @@ -168,58 +170,59 @@ private[spark] object PythonRDD extends Logging { * are collected as separate jobs, by order of index. Partition data is first requested by a * non-zero integer to start a collection job. The response is prefaced by an integer with 1 * meaning partition data will be served, 0 meaning the local iterator has been consumed, - * and -1 meaining an error occurred during collection. This function is used by + * and -1 meaning an error occurred during collection. This function i
[spark] branch master updated: [SPARK-28157][CORE] Make SHS clear KVStore `LogInfo`s for the blacklisted entries
This is an automated email from the ASF dual-hosted git repository. dbtsai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7eeca02 [SPARK-28157][CORE] Make SHS clear KVStore `LogInfo`s for the blacklisted entries 7eeca02 is described below commit 7eeca029404c8cc1e2c3e7ae8728b90582e25d76 Author: Dongjoon Hyun AuthorDate: Wed Jun 26 18:56:06 2019 + [SPARK-28157][CORE] Make SHS clear KVStore `LogInfo`s for the blacklisted entries ## What changes were proposed in this pull request? At Spark 2.4.0/2.3.2/2.2.3, [SPARK-24948](https://issues.apache.org/jira/browse/SPARK-24948) delegated access permission checks to the file system, and maintains a blacklist for all event log files failed once at reading. The blacklisted log files are released back after `CLEAN_INTERVAL_S` seconds. However, the released files whose sizes don't changes are ignored forever due to `info.fileSize < entry.getLen()` condition (previously [here](https://github.com/apache/spark/commit/3c96937c7b1d7a010b630f4b98fd22dafc37808b#diff-a7befb99e7bd7e3ab5c46c2568aa5b3eR454) and now at [shouldReloadLog](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala#L571)) which returns `false` always when the size is the same with the exi [...] This PR aims to remove the existing entry from `KVStore` when it goes to the blacklist. ## How was this patch tested? Pass the Jenkins with the updated test case. Closes #24966 from dongjoon-hyun/SPARK-28157. Authored-by: Dongjoon Hyun Signed-off-by: DB Tsai --- .../apache/spark/deploy/history/FsHistoryProvider.scala | 3 +++ .../spark/deploy/history/FsHistoryProviderSuite.scala | 15 ++- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 98265ff..f2ee599 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -536,6 +536,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // We don't have read permissions on the log file logWarning(s"Unable to read log $path", e.getCause) blacklist(path) +// SPARK-28157 We should remove this blacklisted entry from the KVStore +// to handle permission-only changes with the same file sizes later. +listing.delete(classOf[LogInfo], path.toString) case e: Exception => logError("Exception while merging application listings", e) } finally { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 791814b..571c6e3 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -1122,17 +1122,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { writeFile(accessGranted, true, None, SparkListenerApplicationStart("accessGranted", Some("accessGranted"), 1L, "test", None), SparkListenerApplicationEnd(5L)) +var isReadable = false val mockedFs = spy(provider.fs) doThrow(new AccessControlException("Cannot read accessDenied file")).when(mockedFs).open( - argThat((path: Path) => path.getName.toLowerCase(Locale.ROOT) == "accessdenied")) + argThat((path: Path) => path.getName.toLowerCase(Locale.ROOT) == "accessdenied" && +!isReadable)) val mockedProvider = spy(provider) when(mockedProvider.fs).thenReturn(mockedFs) updateAndCheck(mockedProvider) { list => list.size should be(1) } -writeFile(accessDenied, true, None, - SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None), - SparkListenerApplicationEnd(5L)) // Doing 2 times in order to check the blacklist filter too updateAndCheck(mockedProvider) { list => list.size should be(1) @@ -1140,8 +1139,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { val accessDeniedPath = new Path(accessDenied.getPath) assert(mockedProvider.isBlacklisted(accessDeniedPath)) clock.advance(24 * 60 * 60 * 1000 + 1) // add a bit more than 1d +isReadable = true mockedProvider.cleanLogs() -assert(!mockedProvider.isBlacklisted(accessDeniedPath)) +updateAndCheck(mockedProvider) { list => + assert(!mockedProvider.isBlacklisted(accessDeniedPath)) + assert(
[spark] branch branch-2.3 updated: [SPARK-28164] Fix usage description of `start-slave.sh`
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.3 by this push: new a054a00 [SPARK-28164] Fix usage description of `start-slave.sh` a054a00 is described below commit a054a00908119a380477f225f8d0cb897169adea Author: shivusondur AuthorDate: Wed Jun 26 12:42:33 2019 -0500 [SPARK-28164] Fix usage description of `start-slave.sh` ## What changes were proposed in this pull request? updated the usage message in sbin/start-slave.sh. argument moved to first ## How was this patch tested? tested locally with Starting master starting slave with (./start-slave.sh spark://: -c 1 and opening spark shell with ./spark-shell --master spark://: Closes #24974 from shivusondur/jira28164. Authored-by: shivusondur Signed-off-by: Sean Owen (cherry picked from commit bd232b98b470a609472a4ea8df912f8fad06ba06) Signed-off-by: Sean Owen --- sbin/start-slave.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sbin/start-slave.sh b/sbin/start-slave.sh index 8c268b8..4e57f74 100755 --- a/sbin/start-slave.sh +++ b/sbin/start-slave.sh @@ -40,7 +40,7 @@ fi CLASS="org.apache.spark.deploy.worker.Worker" if [[ $# -lt 1 ]] || [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then - echo "Usage: ./sbin/start-slave.sh [options] " + echo "Usage: ./sbin/start-slave.sh [options]" pattern="Usage:" pattern+="\|Using Spark's default log4j profile:" pattern+="\|Registered signal handlers for" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-28164] Fix usage description of `start-slave.sh`
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new bd232b9 [SPARK-28164] Fix usage description of `start-slave.sh` bd232b9 is described below commit bd232b98b470a609472a4ea8df912f8fad06ba06 Author: shivusondur AuthorDate: Wed Jun 26 12:42:33 2019 -0500 [SPARK-28164] Fix usage description of `start-slave.sh` ## What changes were proposed in this pull request? updated the usage message in sbin/start-slave.sh. argument moved to first ## How was this patch tested? tested locally with Starting master starting slave with (./start-slave.sh spark://: -c 1 and opening spark shell with ./spark-shell --master spark://: Closes #24974 from shivusondur/jira28164. Authored-by: shivusondur Signed-off-by: Sean Owen --- sbin/start-slave.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sbin/start-slave.sh b/sbin/start-slave.sh index 247c9e2..2cb17a0 100755 --- a/sbin/start-slave.sh +++ b/sbin/start-slave.sh @@ -40,7 +40,7 @@ fi CLASS="org.apache.spark.deploy.worker.Worker" if [[ $# -lt 1 ]] || [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then - echo "Usage: ./sbin/start-slave.sh [options] " + echo "Usage: ./sbin/start-slave.sh [options]" pattern="Usage:" pattern+="\|Using Spark's default log4j profile:" pattern+="\|Started daemon with process name" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-28164] Fix usage description of `start-slave.sh`
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new eb66d3b [SPARK-28164] Fix usage description of `start-slave.sh` eb66d3b is described below commit eb66d3b222d2eb6738c1242508b121176370617b Author: shivusondur AuthorDate: Wed Jun 26 12:42:33 2019 -0500 [SPARK-28164] Fix usage description of `start-slave.sh` ## What changes were proposed in this pull request? updated the usage message in sbin/start-slave.sh. argument moved to first ## How was this patch tested? tested locally with Starting master starting slave with (./start-slave.sh spark://: -c 1 and opening spark shell with ./spark-shell --master spark://: Closes #24974 from shivusondur/jira28164. Authored-by: shivusondur Signed-off-by: Sean Owen (cherry picked from commit bd232b98b470a609472a4ea8df912f8fad06ba06) Signed-off-by: Sean Owen --- sbin/start-slave.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sbin/start-slave.sh b/sbin/start-slave.sh index 8c268b8..4e57f74 100755 --- a/sbin/start-slave.sh +++ b/sbin/start-slave.sh @@ -40,7 +40,7 @@ fi CLASS="org.apache.spark.deploy.worker.Worker" if [[ $# -lt 1 ]] || [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then - echo "Usage: ./sbin/start-slave.sh [options] " + echo "Usage: ./sbin/start-slave.sh [options]" pattern="Usage:" pattern+="\|Using Spark's default log4j profile:" pattern+="\|Registered signal handlers for" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-28005][YARN] Remove unnecessary log from SparkRackResolver
This is an automated email from the ASF dual-hosted git repository. irashid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8313015 [SPARK-28005][YARN] Remove unnecessary log from SparkRackResolver 8313015 is described below commit 8313015e8dd11aafc5b887773e467892d15134de Author: Gabor Somogyi AuthorDate: Wed Jun 26 09:50:54 2019 -0500 [SPARK-28005][YARN] Remove unnecessary log from SparkRackResolver ## What changes were proposed in this pull request? SparkRackResolver generates an INFO message every time is called with 0 arguments. In this PR I've deleted it because it's too verbose. ## How was this patch tested? Existing unit tests + spark-shell. Closes #24935 from gaborgsomogyi/SPARK-28005. Authored-by: Gabor Somogyi Signed-off-by: Imran Rashid --- .../main/scala/org/apache/spark/deploy/yarn/SparkRackResolver.scala| 3 +++ 1 file changed, 3 insertions(+) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/SparkRackResolver.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/SparkRackResolver.scala index cab3272..51be932 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/SparkRackResolver.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/SparkRackResolver.scala @@ -67,6 +67,9 @@ private[spark] class SparkRackResolver(conf: Configuration) extends Logging { } private def coreResolve(hostNames: Seq[String]): Seq[Node] = { +if (hostNames.isEmpty) { + return Seq.empty +} val nodes = new ArrayBuffer[Node] // dnsToSwitchMapping is thread-safe val rNameList = dnsToSwitchMapping.resolve(hostNames.toList.asJava).asScala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-27100][SQL][2.4] Use `Array` instead of `Seq` in `FilePartition` to prevent StackOverflowError
This is an automated email from the ASF dual-hosted git repository. dbtsai pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new 680c1b6 [SPARK-27100][SQL][2.4] Use `Array` instead of `Seq` in `FilePartition` to prevent StackOverflowError 680c1b6 is described below commit 680c1b6aa4ca7e4f8abb9261b8566b6f82e05d96 Author: Parth Chandra AuthorDate: Wed Jun 26 07:48:27 2019 + [SPARK-27100][SQL][2.4] Use `Array` instead of `Seq` in `FilePartition` to prevent StackOverflowError … prevent `StackOverflowError ` ShuffleMapTask's partition field is a FilePartition and FilePartition's 'files' field is a Stream$cons which is essentially a linked list. It is therefore serialized recursively. If the number of files in each partition is, say, 1 files, recursing into a linked list of length 1 overflows the stack The problem is only in Bucketed partitions. The corresponding implementation for non Bucketed partitions uses a StreamBuffer. The proposed change applies the same for Bucketed partitions. Existing unit tests. Added new unit test. The unit test fails without the patch. Manual testing on dataset used to reproduce the problem. Closes #24957 from parthchandra/branch-2.4. Authored-by: Parth Chandra Signed-off-by: DB Tsai --- .../spark/sql/execution/DataSourceScanExec.scala | 14 +++--- .../sql/execution/datasources/FileScanRDD.scala| 2 +- .../datasources/FileSourceStrategySuite.scala | 2 +- .../spark/sql/sources/BucketedReadSuite.scala | 50 ++ 4 files changed, 59 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 5433c30..d22fe64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -187,14 +187,14 @@ case class FileSourceScanExec( private var metadataTime = 0L - @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = { + @transient private lazy val selectedPartitions: Array[PartitionDirectory] = { val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) val startTime = System.nanoTime() val ret = relation.location.listFiles(partitionFilters, dataFilters) val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000 metadataTime = timeTakenMs ret - } + }.toArray override lazy val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = { val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) { @@ -377,7 +377,7 @@ case class FileSourceScanExec( private def createBucketedReadRDD( bucketSpec: BucketSpec, readFile: (PartitionedFile) => Iterator[InternalRow], - selectedPartitions: Seq[PartitionDirectory], + selectedPartitions: Array[PartitionDirectory], fsRelation: HadoopFsRelation): RDD[InternalRow] = { logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val filesGroupedToBuckets = @@ -402,7 +402,7 @@ case class FileSourceScanExec( } val filePartitions = Seq.tabulate(bucketSpec.numBuckets) { bucketId => - FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Nil)) + FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty)) } new FileScanRDD(fsRelation.sparkSession, readFile, filePartitions) @@ -418,7 +418,7 @@ case class FileSourceScanExec( */ private def createNonBucketedReadRDD( readFile: (PartitionedFile) => Iterator[InternalRow], - selectedPartitions: Seq[PartitionDirectory], + selectedPartitions: Array[PartitionDirectory], fsRelation: HadoopFsRelation): RDD[InternalRow] = { val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes @@ -449,7 +449,7 @@ case class FileSourceScanExec( partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts)) } } -}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse) +}.sortBy(_.length)(implicitly[Ordering[Long]].reverse) val partitions = new ArrayBuffer[FilePartition] val currentFiles = new ArrayBuffer[PartitionedFile] @@ -461,7 +461,7 @@ case class FileSourceScanExec( val newPartition = FilePartition( partitions.size, -currentFiles.toArray.toSeq) // Copy to a new Array. +currentFiles.toArray) // Copy to a new Array. partitions += newPartition } currentFiles.clear() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ex