Repository: spark
Updated Branches:
  refs/heads/master 969ffd631 -> c8affec21


[SPARK-22203][SQL] Add job description for file listing Spark jobs

## What changes were proposed in this pull request?

The user may be confused about some 10000-tasks jobs. We can add a job 
description for these jobs so that the user can figure it out.

## How was this patch tested?

The new unit test.

Before:
<img width="343" alt="screen shot 2017-10-04 at 3 22 09 pm" 
src="https://user-images.githubusercontent.com/1000778/31202567-f78d15c0-a917-11e7-841e-11b8bf8f0032.png";>

After:
<img width="473" alt="screen shot 2017-10-04 at 3 13 51 pm" 
src="https://user-images.githubusercontent.com/1000778/31202576-fc01e356-a917-11e7-9c2b-7bf80b153adb.png";>

Author: Shixiong Zhu <zsxw...@gmail.com>

Closes #19432 from zsxwing/SPARK-22203.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8affec2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8affec2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8affec2

Branch: refs/heads/master
Commit: c8affec21c91d638009524955515fc143ad86f20
Parents: 969ffd6
Author: Shixiong Zhu <zsxw...@gmail.com>
Authored: Wed Oct 4 20:58:48 2017 -0700
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Wed Oct 4 20:58:48 2017 -0700

----------------------------------------------------------------------
 .../datasources/InMemoryFileIndex.scala         | 85 ++++++++++++--------
 .../sql/test/DataFrameReaderWriterSuite.scala   | 31 +++++++
 2 files changed, 81 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c8affec2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index 203d449..318ada0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
 
+import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.SparkSession
@@ -187,42 +188,56 @@ object InMemoryFileIndex extends Logging {
     // in case of large #defaultParallelism.
     val numParallelism = Math.min(paths.size, 
parallelPartitionDiscoveryParallelism)
 
-    val statusMap = sparkContext
-      .parallelize(serializedPaths, numParallelism)
-      .mapPartitions { pathStrings =>
-        val hadoopConf = serializableConfiguration.value
-        pathStrings.map(new Path(_)).toSeq.map { path =>
-          (path, listLeafFiles(path, hadoopConf, filter, None))
-        }.iterator
-      }.map { case (path, statuses) =>
-      val serializableStatuses = statuses.map { status =>
-        // Turn FileStatus into SerializableFileStatus so we can send it back 
to the driver
-        val blockLocations = status match {
-          case f: LocatedFileStatus =>
-            f.getBlockLocations.map { loc =>
-              SerializableBlockLocation(
-                loc.getNames,
-                loc.getHosts,
-                loc.getOffset,
-                loc.getLength)
-            }
-
-          case _ =>
-            Array.empty[SerializableBlockLocation]
-        }
-
-        SerializableFileStatus(
-          status.getPath.toString,
-          status.getLen,
-          status.isDirectory,
-          status.getReplication,
-          status.getBlockSize,
-          status.getModificationTime,
-          status.getAccessTime,
-          blockLocations)
+    val previousJobDescription = 
sparkContext.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
+    val statusMap = try {
+      val description = paths.size match {
+        case 0 =>
+          s"Listing leaf files and directories 0 paths"
+        case 1 =>
+          s"Listing leaf files and directories for 1 path:<br/>${paths(0)}"
+        case s =>
+          s"Listing leaf files and directories for $s paths:<br/>${paths(0)}, 
..."
       }
-      (path.toString, serializableStatuses)
-    }.collect()
+      sparkContext.setJobDescription(description)
+      sparkContext
+        .parallelize(serializedPaths, numParallelism)
+        .mapPartitions { pathStrings =>
+          val hadoopConf = serializableConfiguration.value
+          pathStrings.map(new Path(_)).toSeq.map { path =>
+            (path, listLeafFiles(path, hadoopConf, filter, None))
+          }.iterator
+        }.map { case (path, statuses) =>
+        val serializableStatuses = statuses.map { status =>
+          // Turn FileStatus into SerializableFileStatus so we can send it 
back to the driver
+          val blockLocations = status match {
+            case f: LocatedFileStatus =>
+              f.getBlockLocations.map { loc =>
+                SerializableBlockLocation(
+                  loc.getNames,
+                  loc.getHosts,
+                  loc.getOffset,
+                  loc.getLength)
+              }
+
+            case _ =>
+              Array.empty[SerializableBlockLocation]
+          }
+
+          SerializableFileStatus(
+            status.getPath.toString,
+            status.getLen,
+            status.isDirectory,
+            status.getReplication,
+            status.getBlockSize,
+            status.getModificationTime,
+            status.getAccessTime,
+            blockLocations)
+        }
+        (path.toString, serializableStatuses)
+      }.collect()
+    } finally {
+      sparkContext.setJobDescription(previousJobDescription)
+    }
 
     // turn SerializableFileStatus back to Status
     statusMap.map { case (path, serializableStatuses) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/c8affec2/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 569bac1..a5d7e62 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -21,10 +21,14 @@ import java.io.File
 import java.util.Locale
 import java.util.concurrent.ConcurrentLinkedQueue
 
+import scala.collection.JavaConverters._
+
 import org.scalatest.BeforeAndAfter
 
+import org.apache.spark.SparkContext
 import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
 import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.internal.SQLConf
@@ -775,4 +779,31 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSQLContext with Be
       }
     }
   }
+
+  test("use Spark jobs to list files") {
+    withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "1") {
+      withTempDir { dir =>
+        val jobDescriptions = new ConcurrentLinkedQueue[String]()
+        val jobListener = new SparkListener {
+          override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+            
jobDescriptions.add(jobStart.properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
+          }
+        }
+        sparkContext.addSparkListener(jobListener)
+        try {
+          spark.range(0, 3).map(i => (i, i))
+            
.write.partitionBy("_1").mode("overwrite").parquet(dir.getCanonicalPath)
+          // normal file paths
+          checkDatasetUnorderly(
+            spark.read.parquet(dir.getCanonicalPath).as[(Long, Long)],
+            0L -> 0L, 1L -> 1L, 2L -> 2L)
+          sparkContext.listenerBus.waitUntilEmpty(10000)
+          assert(jobDescriptions.asScala.toList.exists(
+            _.contains("Listing leaf files and directories for 3 paths")))
+        } finally {
+          sparkContext.removeSparkListener(jobListener)
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to