[ 
https://issues.apache.org/jira/browse/SPARK-33635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17256046#comment-17256046
 ] 

David Wyles edited comment on SPARK-33635 at 12/29/20, 4:34 PM:
----------------------------------------------------------------

[~gsomogyi] I now have my results.
 I was so unhappy about these results I ran all the tests again, the only thing 
that changed between them is the version of spark running on the cluster, 
everything else was static - the data input from kafka was an unchanging static 
set of data.

Input-> *672733262* rows

+*Spark 2.4.5*:+

*440* seconds - *1,528,939* rows per second.

+*Spark 3.0.1*:+

*990* seconds - *679,528* rows per seconds.

These are multiple runs (I even took the best from sprak 3.0.1)

I also captured the event logs between these two versions of spark - should 
anyone find them useful.

[event 
logs|https://drive.google.com/drive/folders/1aElmzVWmJqRALQimdOYxdJu559_3EX_9?usp=sharing]

So, no matter what I do, I can only conclude that Spark 2.4.5 was a lot faster 
in this test case (In my production use case I'm just writing to parquet files 
in hdfs - which is where I noticed the degredation in performant).

Is Spark SQL reading the source data twice, just as it would if there was a 
"order by" in the query?

Sample code used:

val spark =
   SparkSession.builder.appName("Kafka Read Performance")
     .config("spark.executor.memory","16g")
     .config("spark.cores.max", "10")
     .config("spark.eventLog.enabled","true")
     .config("spark.eventLog.dir","file:///tmp/spark-events")
     .config("spark.eventLog.overwrite","true")
    .getOrCreate()

import spark.implicits._

val *startTime* = System.nanoTime()

val df = 
   spark
     .read
     .format("kafka")
     .option("kafka.bootstrap.servers", config.brokers)
     .option("subscribe", config.inTopic)
     .option("startingOffsets", "earliest")
     .option("endingOffsets", "latest")
     .option("failOnDataLoss","false")
     .load()

df
   .write
   .format("kafka")
   .option("kafka.bootstrap.servers", config.brokers)
   .option("topic", config.outTopic)
   .mode(SaveMode.Append)
   .save()

val *endTime* = System.nanoTime()

val elapsedSecs = (endTime - startTime) / 1E9

// static input sample was used, fixed row count.

println(s"Took $elapsedSecs secs")
 spark.stop()

 


was (Author: david.wyles):
[~gsomogyi] I now have my results.
I was so unhappy about these results I ran all the tests again, the only thing 
that changed between them is the version of spark running on the cluster, 
everything else was static - the data input from kafka was an unchanging static 
set of data.

Input-> *672733262* rows

+*Spark 2.4.5*:+

*440* seconds - *1,528,939* rows per second.

+*Spark 3.0.1*:+

*990* seconds - *679,528* rows per seconds.

These are multiple runs (I even took the best from sprak 3.0.1)

I also captured the event logs between these two versions of spark - should 
anyone find them useful.

So, no matter what I do, I can only conclude that Spark 2.4.5 was a lot faster 
in this test case (In my production use case I'm just writing to parquet files 
in hdfs - which is where I noticed the degredation in performant).

Is Spark SQL reading the source data twice, just as it would if there was a 
"order by" in the query?

Sample code used:



val spark =
  SparkSession.builder.appName("Kafka Read Performance")
    .config("spark.executor.memory","16g")
    .config("spark.cores.max", "10")
    .config("spark.eventLog.enabled","true")
    .config("spark.eventLog.dir","file:///tmp/spark-events")
    .config("spark.eventLog.overwrite","true")
   .getOrCreate()

 import spark.implicits._

val *startTime* = System.nanoTime()

 val df = 
  spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", config.brokers)
    .option("subscribe", config.inTopic)
    .option("startingOffsets", "earliest")
    .option("endingOffsets", "latest")
    .option("failOnDataLoss","false")
    .load()

df
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", config.brokers)
  .option("topic", config.outTopic)
  .mode(SaveMode.Append)
  .save()

val *endTime* = System.nanoTime()

 val elapsedSecs = (endTime - startTime) / 1E9

 // static input sample was used, fixed row count.

 println(s"Took $elapsedSecs secs")
 spark.stop()

 

> Performance regression in Kafka read
> ------------------------------------
>
>                 Key: SPARK-33635
>                 URL: https://issues.apache.org/jira/browse/SPARK-33635
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0, 3.0.1
>         Environment: A simple 5 node system. A simple data row of csv data in 
> kafka, evenly distributed between the partitions.
> Open JDK 1.8.0.252
> Spark in stand alone - 5 nodes, 10 workers (2 worker per node, each locked to 
> a distinct NUMA group)
> kafka (v 2.3.1) cluster - 5 nodes (1 broker per node).
> Centos 7.7.1908
> 1 topic, 10 partiions, 1 hour queue life
> (this is just one of clusters we have, I have tested on all of them and 
> theyall exhibit the same performance degredation)
>            Reporter: David Wyles
>            Priority: Major
>
> I have observed a slowdown in the reading of data from kafka on all of our 
> systems when migrating from spark 2.4.5 to Spark 3.0.0 (and Spark 3.0.1)
> I have created a sample project to isolate the problem as much as possible, 
> with just a read all data from a kafka topic (see 
> [https://github.com/codegorillauk/spark-kafka-read] ).
> With 2.4.5, across multiple runs, 
>  I get a stable read rate of 1,120,000 (1.12 mill) rows per second
> With 3.0.0 or 3.0.1, across multiple runs,
>  I get a stable read rate of 632,000 (0.632 mil) rows per second
> The represents a *44% loss in performance*. Which is, a lot.
> I have been working though the spark-sql-kafka-0-10 code base, but change for 
> spark 3 have been ongoing for over a year and its difficult to pin point an 
> exact change or reason for the degradation.
> I am happy to help fix this problem, but will need some assitance as I am 
> unfamiliar with the spark-sql-kafka-0-10 project.
>  
> A sample of the data my test reads (note: its not parsing csv - this is just 
> test data)
>  
> 1606921800000,001e0610e532,lightsense,tsl250rd,intensity,21853,53.262,acceleration_z,651,ep,290,commit,913,pressure,138,pm1,799,uv_intensity,823,idletime,-372,count,-72,ir_intensity,185,concentration,-61,flags,-532,tx,694.36,ep_heatsink,-556.92,acceleration_x,-221.40,fw,910.53,sample_flow_rate,-959.60,uptime,-515.15,pm10,-768.03,powersupply,214.72,magnetic_field_y,-616.04,alphasense,606.73,AoT_Chicago,053,Racine
>  Ave & 18th St Chicago IL,41.857959,-87.65642700000002,AoT Chicago (S) 
> [C],2017/12/15 00:00:00,



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to