Repository: spark Updated Branches: refs/heads/master 969ffd631 -> c8affec21
[SPARK-22203][SQL] Add job description for file listing Spark jobs ## What changes were proposed in this pull request? The user may be confused about some 10000-tasks jobs. We can add a job description for these jobs so that the user can figure it out. ## How was this patch tested? The new unit test. Before: <img width="343" alt="screen shot 2017-10-04 at 3 22 09 pm" src="https://user-images.githubusercontent.com/1000778/31202567-f78d15c0-a917-11e7-841e-11b8bf8f0032.png"> After: <img width="473" alt="screen shot 2017-10-04 at 3 13 51 pm" src="https://user-images.githubusercontent.com/1000778/31202576-fc01e356-a917-11e7-9c2b-7bf80b153adb.png"> Author: Shixiong Zhu <zsxw...@gmail.com> Closes #19432 from zsxwing/SPARK-22203. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8affec2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8affec2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8affec2 Branch: refs/heads/master Commit: c8affec21c91d638009524955515fc143ad86f20 Parents: 969ffd6 Author: Shixiong Zhu <zsxw...@gmail.com> Authored: Wed Oct 4 20:58:48 2017 -0700 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Wed Oct 4 20:58:48 2017 -0700 ---------------------------------------------------------------------- .../datasources/InMemoryFileIndex.scala | 85 ++++++++++++-------- .../sql/test/DataFrameReaderWriterSuite.scala | 31 +++++++ 2 files changed, 81 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c8affec2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 203d449..318ada0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} +import org.apache.spark.SparkContext import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.SparkSession @@ -187,42 +188,56 @@ object InMemoryFileIndex extends Logging { // in case of large #defaultParallelism. val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism) - val statusMap = sparkContext - .parallelize(serializedPaths, numParallelism) - .mapPartitions { pathStrings => - val hadoopConf = serializableConfiguration.value - pathStrings.map(new Path(_)).toSeq.map { path => - (path, listLeafFiles(path, hadoopConf, filter, None)) - }.iterator - }.map { case (path, statuses) => - val serializableStatuses = statuses.map { status => - // Turn FileStatus into SerializableFileStatus so we can send it back to the driver - val blockLocations = status match { - case f: LocatedFileStatus => - f.getBlockLocations.map { loc => - SerializableBlockLocation( - loc.getNames, - loc.getHosts, - loc.getOffset, - loc.getLength) - } - - case _ => - Array.empty[SerializableBlockLocation] - } - - SerializableFileStatus( - status.getPath.toString, - status.getLen, - status.isDirectory, - status.getReplication, - status.getBlockSize, - status.getModificationTime, - status.getAccessTime, - blockLocations) + val previousJobDescription = sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION) + val statusMap = try { + val description = paths.size match { + case 0 => + s"Listing leaf files and directories 0 paths" + case 1 => + s"Listing leaf files and directories for 1 path:<br/>${paths(0)}" + case s => + s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, ..." } - (path.toString, serializableStatuses) - }.collect() + sparkContext.setJobDescription(description) + sparkContext + .parallelize(serializedPaths, numParallelism) + .mapPartitions { pathStrings => + val hadoopConf = serializableConfiguration.value + pathStrings.map(new Path(_)).toSeq.map { path => + (path, listLeafFiles(path, hadoopConf, filter, None)) + }.iterator + }.map { case (path, statuses) => + val serializableStatuses = statuses.map { status => + // Turn FileStatus into SerializableFileStatus so we can send it back to the driver + val blockLocations = status match { + case f: LocatedFileStatus => + f.getBlockLocations.map { loc => + SerializableBlockLocation( + loc.getNames, + loc.getHosts, + loc.getOffset, + loc.getLength) + } + + case _ => + Array.empty[SerializableBlockLocation] + } + + SerializableFileStatus( + status.getPath.toString, + status.getLen, + status.isDirectory, + status.getReplication, + status.getBlockSize, + status.getModificationTime, + status.getAccessTime, + blockLocations) + } + (path.toString, serializableStatuses) + }.collect() + } finally { + sparkContext.setJobDescription(previousJobDescription) + } // turn SerializableFileStatus back to Status statusMap.map { case (path, serializableStatuses) => http://git-wip-us.apache.org/repos/asf/spark/blob/c8affec2/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 569bac1..a5d7e62 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -21,10 +21,14 @@ import java.io.File import java.util.Locale import java.util.concurrent.ConcurrentLinkedQueue +import scala.collection.JavaConverters._ + import org.scalatest.BeforeAndAfter +import org.apache.spark.SparkContext import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.internal.SQLConf @@ -775,4 +779,31 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be } } } + + test("use Spark jobs to list files") { + withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "1") { + withTempDir { dir => + val jobDescriptions = new ConcurrentLinkedQueue[String]() + val jobListener = new SparkListener { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobDescriptions.add(jobStart.properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) + } + } + sparkContext.addSparkListener(jobListener) + try { + spark.range(0, 3).map(i => (i, i)) + .write.partitionBy("_1").mode("overwrite").parquet(dir.getCanonicalPath) + // normal file paths + checkDatasetUnorderly( + spark.read.parquet(dir.getCanonicalPath).as[(Long, Long)], + 0L -> 0L, 1L -> 1L, 2L -> 2L) + sparkContext.listenerBus.waitUntilEmpty(10000) + assert(jobDescriptions.asScala.toList.exists( + _.contains("Listing leaf files and directories for 3 paths"))) + } finally { + sparkContext.removeSparkListener(jobListener) + } + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org