Re: How to determine the function of tasks on each stage in an Apache Spark application?

2023-05-02 Thread Trường Trần Phan An
Hi all,

I have written a program and overridden two events onStageCompleted and
onTaskEnd. However, these two events do not provide information on when a
Task/Stage is completed.

What I want to know is which Task corresponds to which stage of a DAG (the
Spark history server only tells me how many stages a Job has and how many
Jobs a Stage has).

Can I print out the edges of the Tasks according to the DAGScheduler?
Below is the program I have written:

import org.apache.spark.rdd.RDD
import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext, TaskEndReason}
import org.apache.spark.scheduler.{SparkListener,
SparkListenerEnvironmentUpdate, SparkListenerStageCompleted,
SparkListenerTaskEnd}
import scala.collection.mutable
import org.apache.spark.sql.execution.SparkPlan

class CustomListener extends SparkListener {
  override def onStageCompleted(stageCompleted:
SparkListenerStageCompleted): Unit = {
val rdds = stageCompleted.stageInfo.rddInfos
val stageInfo = stageCompleted.stageInfo
println(s"Stage ${stageInfo.stageId}")
println(s"Number of tasks: ${stageInfo.numTasks}")

stageInfo.rddInfos.foreach { rddInfo =>
  println(s"RDD ${rddInfo.id} has ${rddInfo.numPartitions} partitions.")
}
  }

  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val stageId = taskEnd.stageId
val stageAttemptId = taskEnd.stageAttemptId
val taskInfo = taskEnd.taskInfo
println(s"Task: ${taskInfo.taskId}; Stage: $stageId; Duration:
${taskInfo.duration} ms.")
  }

  def wordCount(sc: SparkContext, inputPath: String): Unit = {
val data = sc.textFile(inputPath)
val flatMap = data.flatMap(line => line.split(","))
val map = flatMap.map(word => (word, 1))
val reduceByKey = map.reduceByKey(_ + _)
reduceByKey.foreach(println)
  }
}

object Scenario1 {
  def main(args: Array[String]): Unit = {

val appName = "scenario1"
val spark = SparkSession.builder()
  .master("local[*]")
  .appName(appName)
  .getOrCreate()

val sc = spark.sparkContext
val sparkListener = new CustomListener()
sc.addSparkListener(sparkListener)
val inputPath = "s3a://data-join/file00"
sparkListener.wordCount(sc, inputPath)
sc.stop()

  }
}

Best regards,

Truong


Vào CN, 16 thg 4, 2023 vào lúc 09:32 Trường Trần Phan An <
truong...@vlute.edu.vn> đã viết:

> Dear Jacek Laskowski,
>
> Thank you for your guide. I will try it out for my problem.
>
> Best regards,
> Truong
>
>
> Vào Th 6, 14 thg 4, 2023 vào lúc 21:00 Jacek Laskowski 
> đã viết:
>
>> Hi,
>>
>> Start with intercepting stage completions
>> using SparkListenerStageCompleted [1]. That's Spark Core (jobs, stages and
>> tasks).
>>
>> Go up the execution chain to Spark SQL
>> with SparkListenerSQLExecutionStart [2] and SparkListenerSQLExecutionEnd
>> [3], and correlate infos.
>>
>> You may want to look at how web UI works under the covers to collect all
>> the information. Start from SQLTab that should give you what is displayed
>> (that should give you then what's needed and how it's collected).
>>
>> [1]
>> https://github.com/apache/spark/blob/8cceb3946bdfa5ceac0f2b4fe6a7c43eafb76d59/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala#L46
>> [2]
>> https://github.com/apache/spark/blob/24cdae8f3dcfc825c6c0b8ab8aa8505ae194050b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L44
>> [3]
>> https://github.com/apache/spark/blob/24cdae8f3dcfc825c6c0b8ab8aa8505ae194050b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L60
>> [4]
>> https://github.com/apache/spark/blob/c124037b97538b2656d29ce547b2a42209a41703/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala#L24
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> "The Internals Of" Online Books 
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> 
>>
>>
>> On Thu, Apr 13, 2023 at 10:40 AM Trường Trần Phan An <
>> truong...@vlute.edu.vn> wrote:
>>
>>> Hi,
>>>
>>> Can you give me more details or give me a tutorial on "You'd have to
>>> intercept execution events and correlate them. Not an easy task yet doable"
>>>
>>> Thank
>>>
>>> Vào Th 4, 12 thg 4, 2023 vào lúc 21:04 Jacek Laskowski <
>>> ja...@japila.pl> đã viết:
>>>
 Hi,

 tl;dr it's not possible to "reverse-engineer" tasks to functions.

 In essence, Spark SQL is an abstraction layer over RDD API that's made
 up of partitions and tasks. Tasks are Scala functions (possibly with some
 Python for PySpark). A simple-looking high-level operator like
 DataFrame.join can end up with multiple RDDs, each with a set of partitions
 (and hence tasks). What the tasks do is an implementation detail that you'd
 have to know about by reading the source code of Spark SQL that produces
 the "bytecode".

 Just looking at the DAG or the tasks 

Re: How to determine the function of tasks on each stage in an Apache Spark application?

2023-04-14 Thread Jacek Laskowski
Hi,

Start with intercepting stage completions using SparkListenerStageCompleted
[1]. That's Spark Core (jobs, stages and tasks).

Go up the execution chain to Spark SQL with SparkListenerSQLExecutionStart
[2] and SparkListenerSQLExecutionEnd [3], and correlate infos.

You may want to look at how web UI works under the covers to collect all
the information. Start from SQLTab that should give you what is displayed
(that should give you then what's needed and how it's collected).

[1]
https://github.com/apache/spark/blob/8cceb3946bdfa5ceac0f2b4fe6a7c43eafb76d59/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala#L46
[2]
https://github.com/apache/spark/blob/24cdae8f3dcfc825c6c0b8ab8aa8505ae194050b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L44
[3]
https://github.com/apache/spark/blob/24cdae8f3dcfc825c6c0b8ab8aa8505ae194050b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L60
[4]
https://github.com/apache/spark/blob/c124037b97538b2656d29ce547b2a42209a41703/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala#L24

Pozdrawiam,
Jacek Laskowski

"The Internals Of" Online Books 
Follow me on https://twitter.com/jaceklaskowski




On Thu, Apr 13, 2023 at 10:40 AM Trường Trần Phan An 
wrote:

> Hi,
>
> Can you give me more details or give me a tutorial on "You'd have to
> intercept execution events and correlate them. Not an easy task yet doable"
>
> Thank
>
> Vào Th 4, 12 thg 4, 2023 vào lúc 21:04 Jacek Laskowski 
> đã viết:
>
>> Hi,
>>
>> tl;dr it's not possible to "reverse-engineer" tasks to functions.
>>
>> In essence, Spark SQL is an abstraction layer over RDD API that's made up
>> of partitions and tasks. Tasks are Scala functions (possibly with some
>> Python for PySpark). A simple-looking high-level operator like
>> DataFrame.join can end up with multiple RDDs, each with a set of partitions
>> (and hence tasks). What the tasks do is an implementation detail that you'd
>> have to know about by reading the source code of Spark SQL that produces
>> the "bytecode".
>>
>> Just looking at the DAG or the tasks screenshots won't give you that
>> level of detail. You'd have to intercept execution events and correlate
>> them. Not an easy task yet doable. HTH.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> "The Internals Of" Online Books 
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> 
>>
>>
>> On Tue, Apr 11, 2023 at 6:53 PM Trường Trần Phan An <
>> truong...@vlute.edu.vn> wrote:
>>
>>> Hi all,
>>>
>>> I am conducting a study comparing the execution time of Bloom Filter
>>> Join operation on two environments: Apache Spark Cluster and Apache Spark.
>>> I have compared the overall time of the two environments, but I want to
>>> compare specific "tasks on each stage" to see which computation has the
>>> most significant difference.
>>>
>>> I have taken a screenshot of the DAG of Stage 0 and the list of tasks
>>> executed in Stage 0.
>>> - DAG.png
>>> - Task.png
>>>
>>> *I have questions:*
>>> 1. Can we determine which tasks are responsible for executing each step
>>> scheduled on the DAG during the processing?
>>> 2. Is it possible to know the function of each task (e.g., what is task
>>> ID 0 responsible for? What is task ID 1 responsible for? ... )?
>>>
>>> Best regards,
>>> Truong
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: How to determine the function of tasks on each stage in an Apache Spark application?

2023-04-13 Thread Trường Trần Phan An
Hi,

Can you give me more details or give me a tutorial on "You'd have to
intercept execution events and correlate them. Not an easy task yet doable"

Thank

Vào Th 4, 12 thg 4, 2023 vào lúc 21:04 Jacek Laskowski 
đã viết:

> Hi,
>
> tl;dr it's not possible to "reverse-engineer" tasks to functions.
>
> In essence, Spark SQL is an abstraction layer over RDD API that's made up
> of partitions and tasks. Tasks are Scala functions (possibly with some
> Python for PySpark). A simple-looking high-level operator like
> DataFrame.join can end up with multiple RDDs, each with a set of partitions
> (and hence tasks). What the tasks do is an implementation detail that you'd
> have to know about by reading the source code of Spark SQL that produces
> the "bytecode".
>
> Just looking at the DAG or the tasks screenshots won't give you that level
> of detail. You'd have to intercept execution events and correlate them. Not
> an easy task yet doable. HTH.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> "The Internals Of" Online Books 
> Follow me on https://twitter.com/jaceklaskowski
>
> 
>
>
> On Tue, Apr 11, 2023 at 6:53 PM Trường Trần Phan An <
> truong...@vlute.edu.vn> wrote:
>
>> Hi all,
>>
>> I am conducting a study comparing the execution time of Bloom Filter Join
>> operation on two environments: Apache Spark Cluster and Apache Spark. I
>> have compared the overall time of the two environments, but I want to
>> compare specific "tasks on each stage" to see which computation has the
>> most significant difference.
>>
>> I have taken a screenshot of the DAG of Stage 0 and the list of tasks
>> executed in Stage 0.
>> - DAG.png
>> - Task.png
>>
>> *I have questions:*
>> 1. Can we determine which tasks are responsible for executing each step
>> scheduled on the DAG during the processing?
>> 2. Is it possible to know the function of each task (e.g., what is task
>> ID 0 responsible for? What is task ID 1 responsible for? ... )?
>>
>> Best regards,
>> Truong
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How to determine the function of tasks on each stage in an Apache Spark application?

2023-04-12 Thread Maytas Monsereenusorn
Hi,

I was wondering if it's not possible to determine tasks to functions, is it
still possible to easily figure out which job and stage completed which
part of the query from the UI?
For example, in the SQL tab of the Spark UI, I am able to see the query and
the Job IDs for that query. However, when looking at the details for the
Query, how do I know which part of the execution plan was completed by
which job/stage?

Thanks,
Maytas


On Wed, Apr 12, 2023 at 7:06 AM Jacek Laskowski  wrote:

> Hi,
>
> tl;dr it's not possible to "reverse-engineer" tasks to functions.
>
> In essence, Spark SQL is an abstraction layer over RDD API that's made up
> of partitions and tasks. Tasks are Scala functions (possibly with some
> Python for PySpark). A simple-looking high-level operator like
> DataFrame.join can end up with multiple RDDs, each with a set of partitions
> (and hence tasks). What the tasks do is an implementation detail that you'd
> have to know about by reading the source code of Spark SQL that produces
> the "bytecode".
>
> Just looking at the DAG or the tasks screenshots won't give you that level
> of detail. You'd have to intercept execution events and correlate them. Not
> an easy task yet doable. HTH.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> "The Internals Of" Online Books 
> Follow me on https://twitter.com/jaceklaskowski
>
> 
>
>
> On Tue, Apr 11, 2023 at 6:53 PM Trường Trần Phan An <
> truong...@vlute.edu.vn> wrote:
>
>> Hi all,
>>
>> I am conducting a study comparing the execution time of Bloom Filter Join
>> operation on two environments: Apache Spark Cluster and Apache Spark. I
>> have compared the overall time of the two environments, but I want to
>> compare specific "tasks on each stage" to see which computation has the
>> most significant difference.
>>
>> I have taken a screenshot of the DAG of Stage 0 and the list of tasks
>> executed in Stage 0.
>> - DAG.png
>> - Task.png
>>
>> *I have questions:*
>> 1. Can we determine which tasks are responsible for executing each step
>> scheduled on the DAG during the processing?
>> 2. Is it possible to know the function of each task (e.g., what is task
>> ID 0 responsible for? What is task ID 1 responsible for? ... )?
>>
>> Best regards,
>> Truong
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How to determine the function of tasks on each stage in an Apache Spark application?

2023-04-12 Thread Jacek Laskowski
Hi,

tl;dr it's not possible to "reverse-engineer" tasks to functions.

In essence, Spark SQL is an abstraction layer over RDD API that's made up
of partitions and tasks. Tasks are Scala functions (possibly with some
Python for PySpark). A simple-looking high-level operator like
DataFrame.join can end up with multiple RDDs, each with a set of partitions
(and hence tasks). What the tasks do is an implementation detail that you'd
have to know about by reading the source code of Spark SQL that produces
the "bytecode".

Just looking at the DAG or the tasks screenshots won't give you that level
of detail. You'd have to intercept execution events and correlate them. Not
an easy task yet doable. HTH.

Pozdrawiam,
Jacek Laskowski

"The Internals Of" Online Books 
Follow me on https://twitter.com/jaceklaskowski




On Tue, Apr 11, 2023 at 6:53 PM Trường Trần Phan An 
wrote:

> Hi all,
>
> I am conducting a study comparing the execution time of Bloom Filter Join
> operation on two environments: Apache Spark Cluster and Apache Spark. I
> have compared the overall time of the two environments, but I want to
> compare specific "tasks on each stage" to see which computation has the
> most significant difference.
>
> I have taken a screenshot of the DAG of Stage 0 and the list of tasks
> executed in Stage 0.
> - DAG.png
> - Task.png
>
> *I have questions:*
> 1. Can we determine which tasks are responsible for executing each step
> scheduled on the DAG during the processing?
> 2. Is it possible to know the function of each task (e.g., what is task ID
> 0 responsible for? What is task ID 1 responsible for? ... )?
>
> Best regards,
> Truong
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org