[ 
https://issues.apache.org/jira/browse/SPARK-31427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17300773#comment-17300773
 ] 

Nick Hryhoriev edited comment on SPARK-31427 at 3/13/21, 9:22 AM:
------------------------------------------------------------------

I check spark 3, it's the same.
 Maybe in the case of repartition by the range you are right.
 But the same issue happens in  Sort and repartition.
 ```

 
{code:java}
package org.apache.spark.af.it
import org.apache.spark.sql.Dataset
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.SparkSession
object TestKafka {
def main(args: Array[String]): Unit =
{ implicit val spark: SparkSession = 
SparkSession.builder().master("local").getOrCreate() spark .readStream 
.format("kafka") .option("kafka.bootstrap.servers", 
"kafka-20001-026-prod.eu1.appsflyer.com:9092") .option("assign", 
"""\{"sessions":[0,1] }
""")
 .load()
 .writeStream
 .foreachBatch(process _)
 .start()
 .awaitTermination(1000000)
}
private def process(data: Dataset[Row], batchId: Long)(implicit spark: 
SparkSession): Unit =
{ import spark.implicits._ spark.sparkContext.setJobDescription(s"session 
[parquet] - $batchId") data .sort($"partition", $"offset") .repartition(4, 
$"offset") .show(5) }
}
{code}
```


was (Author: hryhoriev.nick):
I check spark 3, it's the same.
Maybe in the case of repartition by the range you are right.
But the same issue happens in  Sort and repartition.
```

package org.apache.spark.af.it

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

object TestKafka {

 def main(args: Array[String]): Unit = {
 implicit val spark: SparkSession = 
SparkSession.builder().master("local").getOrCreate()

 spark
 .readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", 
"kafka-20001-026-prod.eu1.appsflyer.com:9092")
 .option("assign", """\{"sessions":[0,1] }""")
 .load()
 .writeStream
 .foreachBatch(process _)
 .start()
 .awaitTermination(1000000)

 }

 private def process(data: Dataset[Row], batchId: Long)(implicit spark: 
SparkSession): Unit = {
 import spark.implicits._
 spark.sparkContext.setJobDescription(s"session [parquet] - $batchId")
 data
 .sort($"partition", $"offset")
 .repartition(4, $"offset")
 .show(5)
 }

}
```

> Spark Structure streaming read data twice per every micro-batch.
> ----------------------------------------------------------------
>
>                 Key: SPARK-31427
>                 URL: https://issues.apache.org/jira/browse/SPARK-31427
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.3
>            Reporter: Nick Hryhoriev
>            Priority: Major
>
> I have a very strange issue with spark structure streaming. Spark structure 
> streaming creates two spark jobs for every micro-batch. As a result, read 
> data from Kafka twice. Here is a simple code snippet.
>  
> {code:java}
> import org.apache.hadoop.fs.{FileSystem, Path}
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.streaming.Trigger
> object CheckHowSparkReadFromKafka {
>   def main(args: Array[String]): Unit = {
>     val session = SparkSession.builder()
>       .config(new SparkConf()
>         .setAppName(s"simple read from kafka with repartition")
>         .setMaster("local[*]")
>         .set("spark.driver.host", "localhost"))
>       .getOrCreate()
>     val testPath = "/tmp/spark-test"
>     FileSystem.get(session.sparkContext.hadoopConfiguration).delete(new 
> Path(testPath), true)
>     import session.implicits._
>     val stream = session
>       .readStream
>       .format("kafka")
>       .option("kafka.bootstrap.servers",        "kafka-20002-prod:9092")
>       .option("subscribe", "topic")
>       .option("maxOffsetsPerTrigger", 1000)
>       .option("failOnDataLoss", false)
>       .option("startingOffsets", "latest")
>       .load()
>       .repartitionByRange( $"offset")
>       .writeStream
>       .option("path", testPath + "/data")
>       .option("checkpointLocation", testPath + "/checkpoint")
>       .format("parquet")
>       .trigger(Trigger.ProcessingTime(10.seconds))
>       .start()
>     stream.processAllAvailable()
> {code}
> This happens because if {{.repartitionByRange( $"offset")}}, if I remove this 
> line, all good. But with spark create two jobs, one with 1 stage just read 
> from Kafka, the second with 3 stage read -> shuffle -> write. So the result 
> of the first job never used.
> This has a significant impact on performance. Some of my Kafka topics have 
> 1550 partitions, so read them twice is a big deal. In case I add cache, 
> things going better, but this is not a way for me. In local mode, the first 
> job in batch takes less than 0.1 ms, except batch with index 0. But in YARN 
> cluster and Messos both jobs fully expected and on my topics take near 1.2 
> min.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to