The performance difference when running Apache Spark on K8s and traditional server

2023-07-27 Thread Trường Trần Phan An
Hi all,

I am learning about the performance difference of Spark when performing a
JOIN problem on Serverless (K8S) and Serverful (Traditional server)
environments.

Through experiment, Spark on K8s tends to run slower than Serverful.
Through understanding the architecture, I know that Spark runs on K8s as
Containers (Pods) so it takes a certain time to initialize, but when I look
at each job, stage, and task, Spark K8s tends to be slower. Serverful.

*I have some questions:*
Q1: What are the causes and reasons for Spark on K8s to be slower than
Serverful?
Q2: How or is there a scenario to show the most apparent difference in
performance and cost of these two environments (Serverless (K8S) and
Serverful (Traditional server)?

Thank you so much!

Best regards,
Truong


Re: How to determine the function of tasks on each stage in an Apache Spark application?

2023-05-02 Thread Trường Trần Phan An
Hi all,

I have written a program and overridden two events onStageCompleted and
onTaskEnd. However, these two events do not provide information on when a
Task/Stage is completed.

What I want to know is which Task corresponds to which stage of a DAG (the
Spark history server only tells me how many stages a Job has and how many
Jobs a Stage has).

Can I print out the edges of the Tasks according to the DAGScheduler?
Below is the program I have written:

import org.apache.spark.rdd.RDD
import org.apache.spark.TaskContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext, TaskEndReason}
import org.apache.spark.scheduler.{SparkListener,
SparkListenerEnvironmentUpdate, SparkListenerStageCompleted,
SparkListenerTaskEnd}
import scala.collection.mutable
import org.apache.spark.sql.execution.SparkPlan

class CustomListener extends SparkListener {
  override def onStageCompleted(stageCompleted:
SparkListenerStageCompleted): Unit = {
val rdds = stageCompleted.stageInfo.rddInfos
val stageInfo = stageCompleted.stageInfo
println(s"Stage ${stageInfo.stageId}")
println(s"Number of tasks: ${stageInfo.numTasks}")

stageInfo.rddInfos.foreach { rddInfo =>
  println(s"RDD ${rddInfo.id} has ${rddInfo.numPartitions} partitions.")
}
  }

  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val stageId = taskEnd.stageId
val stageAttemptId = taskEnd.stageAttemptId
val taskInfo = taskEnd.taskInfo
println(s"Task: ${taskInfo.taskId}; Stage: $stageId; Duration:
${taskInfo.duration} ms.")
  }

  def wordCount(sc: SparkContext, inputPath: String): Unit = {
val data = sc.textFile(inputPath)
val flatMap = data.flatMap(line => line.split(","))
val map = flatMap.map(word => (word, 1))
val reduceByKey = map.reduceByKey(_ + _)
reduceByKey.foreach(println)
  }
}

object Scenario1 {
  def main(args: Array[String]): Unit = {

val appName = "scenario1"
val spark = SparkSession.builder()
  .master("local[*]")
  .appName(appName)
  .getOrCreate()

val sc = spark.sparkContext
val sparkListener = new CustomListener()
sc.addSparkListener(sparkListener)
val inputPath = "s3a://data-join/file00"
sparkListener.wordCount(sc, inputPath)
    sc.stop()

  }
}

Best regards,

Truong


Vào CN, 16 thg 4, 2023 vào lúc 09:32 Trường Trần Phan An <
truong...@vlute.edu.vn> đã viết:

> Dear Jacek Laskowski,
>
> Thank you for your guide. I will try it out for my problem.
>
> Best regards,
> Truong
>
>
> Vào Th 6, 14 thg 4, 2023 vào lúc 21:00 Jacek Laskowski 
> đã viết:
>
>> Hi,
>>
>> Start with intercepting stage completions
>> using SparkListenerStageCompleted [1]. That's Spark Core (jobs, stages and
>> tasks).
>>
>> Go up the execution chain to Spark SQL
>> with SparkListenerSQLExecutionStart [2] and SparkListenerSQLExecutionEnd
>> [3], and correlate infos.
>>
>> You may want to look at how web UI works under the covers to collect all
>> the information. Start from SQLTab that should give you what is displayed
>> (that should give you then what's needed and how it's collected).
>>
>> [1]
>> https://github.com/apache/spark/blob/8cceb3946bdfa5ceac0f2b4fe6a7c43eafb76d59/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala#L46
>> [2]
>> https://github.com/apache/spark/blob/24cdae8f3dcfc825c6c0b8ab8aa8505ae194050b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L44
>> [3]
>> https://github.com/apache/spark/blob/24cdae8f3dcfc825c6c0b8ab8aa8505ae194050b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala#L60
>> [4]
>> https://github.com/apache/spark/blob/c124037b97538b2656d29ce547b2a42209a41703/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala#L24
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> "The Internals Of" Online Books <https://books.japila.pl/>
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> <https://twitter.com/jaceklaskowski>
>>
>>
>> On Thu, Apr 13, 2023 at 10:40 AM Trường Trần Phan An <
>> truong...@vlute.edu.vn> wrote:
>>
>>> Hi,
>>>
>>> Can you give me more details or give me a tutorial on "You'd have to
>>> intercept execution events and correlate them. Not an easy task yet doable"
>>>
>>> Thank
>>>
>>> Vào Th 4, 12 thg 4, 2023 vào lúc 21:04 Jacek Laskowski <
>>> ja...@japila.pl> đã viết:
>>>
>>>> Hi,
>>>>
>>>> tl;dr it's not possible to "reverse-engineer" tasks to fun

Re: How to determine the function of tasks on each stage in an Apache Spark application?

2023-04-13 Thread Trường Trần Phan An
Hi,

Can you give me more details or give me a tutorial on "You'd have to
intercept execution events and correlate them. Not an easy task yet doable"

Thank

Vào Th 4, 12 thg 4, 2023 vào lúc 21:04 Jacek Laskowski 
đã viết:

> Hi,
>
> tl;dr it's not possible to "reverse-engineer" tasks to functions.
>
> In essence, Spark SQL is an abstraction layer over RDD API that's made up
> of partitions and tasks. Tasks are Scala functions (possibly with some
> Python for PySpark). A simple-looking high-level operator like
> DataFrame.join can end up with multiple RDDs, each with a set of partitions
> (and hence tasks). What the tasks do is an implementation detail that you'd
> have to know about by reading the source code of Spark SQL that produces
> the "bytecode".
>
> Just looking at the DAG or the tasks screenshots won't give you that level
> of detail. You'd have to intercept execution events and correlate them. Not
> an easy task yet doable. HTH.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> "The Internals Of" Online Books <https://books.japila.pl/>
> Follow me on https://twitter.com/jaceklaskowski
>
> <https://twitter.com/jaceklaskowski>
>
>
> On Tue, Apr 11, 2023 at 6:53 PM Trường Trần Phan An <
> truong...@vlute.edu.vn> wrote:
>
>> Hi all,
>>
>> I am conducting a study comparing the execution time of Bloom Filter Join
>> operation on two environments: Apache Spark Cluster and Apache Spark. I
>> have compared the overall time of the two environments, but I want to
>> compare specific "tasks on each stage" to see which computation has the
>> most significant difference.
>>
>> I have taken a screenshot of the DAG of Stage 0 and the list of tasks
>> executed in Stage 0.
>> - DAG.png
>> - Task.png
>>
>> *I have questions:*
>> 1. Can we determine which tasks are responsible for executing each step
>> scheduled on the DAG during the processing?
>> 2. Is it possible to know the function of each task (e.g., what is task
>> ID 0 responsible for? What is task ID 1 responsible for? ... )?
>>
>> Best regards,
>> Truong
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>