Hi all,
I have written a program and overridden two events onStageCompleted and
onTaskEnd. However, these two events do not provide information on when a
Task/Stage is completed.
What I want to know is which Task corresponds to which stage of a DAG (the
Spark history server only tells me how many stages a Job has and how many
Jobs a Stage has).
Can I print out the edges of the Tasks according to the DAGScheduler?
Below is the program I have written:
import org.apache.spark.rdd.RDD
import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext, TaskEndReason}
import org.apache.spark.scheduler.{SparkListener,
SparkListenerEnvironmentUpdate, SparkListenerStageCompleted,
SparkListenerTaskEnd}
import scala.collection.mutable
import org.apache.spark.sql.execution.SparkPlan
class CustomListener extends SparkListener {
override def onStageCompleted(stageCompleted:
SparkListenerStageCompleted): Unit = {
val rdds = stageCompleted.stageInfo.rddInfos
val stageInfo = stageCompleted.stageInfo
println(s"Stage ${stageInfo.stageId}")
println(s"Number of tasks: ${stageInfo.numTasks}")
stageInfo.rddInfos.foreach { rddInfo =>
println(s"RDD ${rddInfo.id} has ${rddInfo.numPartitions} partitions.")
}
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val stageId = taskEnd.stageId
val stageAttemptId = taskEnd.stageAttemptId
val taskInfo = taskEnd.taskInfo
println(s"Task: ${taskInfo.taskId}; Stage: $stageId; Duration:
${taskInfo.duration} ms.")
}
def wordCount(sc: SparkContext, inputPath: String): Unit = {
val data = sc.textFile(inputPath)
val flatMap = data.flatMap(line => line.split(","))
val map = flatMap.map(word => (word, 1))
val reduceByKey = map.reduceByKey(_ + _)
reduceByKey.foreach(println)
}
}
object Scenario1 {
def main(args: Array[String]): Unit = {
val appName = "scenario1"
val spark = SparkSession.builder()
.master("local[*]")
.appName(appName)
.getOrCreate()
val sc = spark.sparkContext
val sparkListener = new CustomListener()
sc.addSparkListener(sparkListener)
val inputPath = "s3a://data-join/file00"
sparkListener.wordCount(sc, inputPath)
sc.stop()
}
}
Best regards,
Truong
Vào CN, 16 thg 4, 2023 vào lúc 09:32 Trường Trần Phan An <
truong...@vlute.edu.vn> đã viết:
> Dear Jacek Laskowski,
>
> Thank you for your guide. I will try it out for my problem.
>
> Best regards,
> Truong
>
>
> Vào Th 6, 14 thg 4, 2023 vào lúc 21:00 Jacek Laskowski
> đã viết:
>
>> Hi,
>>
>> Start with intercepting stage completions
>> using SparkListenerStageCompleted [1]. That's Spark Core (jobs, stages and
>> tasks).
>>
>> Go up the execution chain to Spark SQL
>> with SparkListenerSQLExecutionStart [2] and SparkListenerSQLExecutionEnd
>> [3], and correlate infos.
>>
>> You may want to look at how web UI works under the covers to collect all
>> the information. Start from SQLTab that should give you what is displayed
>> (that should give you then what's needed and how it's collected).
>>
>> [1]
>> https://github.com/apache/spark/blob/8cceb3946bdfa5ceac0f2b4fe6a7c43eafb76d59/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala#L46
>> [2]
>> https://github.com/apache/spark/blob/24cdae8f3dcfc825c6c0b8ab8aa8505ae194050b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L44
>> [3]
>> https://github.com/apache/spark/blob/24cdae8f3dcfc825c6c0b8ab8aa8505ae194050b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L60
>> [4]
>> https://github.com/apache/spark/blob/c124037b97538b2656d29ce547b2a42209a41703/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala#L24
>>
>> Pozdrawiam,
>> Jacek Laskowski
>>
>> "The Internals Of" Online Books <https://books.japila.pl/>
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> <https://twitter.com/jaceklaskowski>
>>
>>
>> On Thu, Apr 13, 2023 at 10:40 AM Trường Trần Phan An <
>> truong...@vlute.edu.vn> wrote:
>>
>>> Hi,
>>>
>>> Can you give me more details or give me a tutorial on "You'd have to
>>> intercept execution events and correlate them. Not an easy task yet doable"
>>>
>>> Thank
>>>
>>> Vào Th 4, 12 thg 4, 2023 vào lúc 21:04 Jacek Laskowski <
>>> ja...@japila.pl> đã viết:
>>>
>>>> Hi,
>>>>
>>>> tl;dr it's not possible to "reverse-engineer" tasks to fun