[ https://issues.apache.org/jira/browse/SPARK-31427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17300773#comment-17300773 ]
Nick Hryhoriev edited comment on SPARK-31427 at 3/13/21, 9:22 AM: ------------------------------------------------------------------ I check spark 3, it's the same. Maybe in the case of repartition by the range you are right. But the same issue happens in Sort and repartition. ``` {code:java} package org.apache.spark.af.it import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession object TestKafka { def main(args: Array[String]): Unit = { implicit val spark: SparkSession = SparkSession.builder().master("local").getOrCreate() spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "kafka-20001-026-prod.eu1.appsflyer.com:9092") .option("assign", """\{"sessions":[0,1] } """) .load() .writeStream .foreachBatch(process _) .start() .awaitTermination(1000000) } private def process(data: Dataset[Row], batchId: Long)(implicit spark: SparkSession): Unit = { import spark.implicits._ spark.sparkContext.setJobDescription(s"session [parquet] - $batchId") data .sort($"partition", $"offset") .repartition(4, $"offset") .show(5) } } {code} ``` was (Author: hryhoriev.nick): I check spark 3, it's the same. Maybe in the case of repartition by the range you are right. But the same issue happens in Sort and repartition. ``` package org.apache.spark.af.it import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession object TestKafka { def main(args: Array[String]): Unit = { implicit val spark: SparkSession = SparkSession.builder().master("local").getOrCreate() spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "kafka-20001-026-prod.eu1.appsflyer.com:9092") .option("assign", """\{"sessions":[0,1] }""") .load() .writeStream .foreachBatch(process _) .start() .awaitTermination(1000000) } private def process(data: Dataset[Row], batchId: Long)(implicit spark: SparkSession): Unit = { import spark.implicits._ spark.sparkContext.setJobDescription(s"session [parquet] - $batchId") data .sort($"partition", $"offset") .repartition(4, $"offset") .show(5) } } ``` > Spark Structure streaming read data twice per every micro-batch. > ---------------------------------------------------------------- > > Key: SPARK-31427 > URL: https://issues.apache.org/jira/browse/SPARK-31427 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.4.3 > Reporter: Nick Hryhoriev > Priority: Major > > I have a very strange issue with spark structure streaming. Spark structure > streaming creates two spark jobs for every micro-batch. As a result, read > data from Kafka twice. Here is a simple code snippet. > > {code:java} > import org.apache.hadoop.fs.{FileSystem, Path} > import org.apache.spark.SparkConf > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.streaming.Trigger > object CheckHowSparkReadFromKafka { > def main(args: Array[String]): Unit = { > val session = SparkSession.builder() > .config(new SparkConf() > .setAppName(s"simple read from kafka with repartition") > .setMaster("local[*]") > .set("spark.driver.host", "localhost")) > .getOrCreate() > val testPath = "/tmp/spark-test" > FileSystem.get(session.sparkContext.hadoopConfiguration).delete(new > Path(testPath), true) > import session.implicits._ > val stream = session > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", "kafka-20002-prod:9092") > .option("subscribe", "topic") > .option("maxOffsetsPerTrigger", 1000) > .option("failOnDataLoss", false) > .option("startingOffsets", "latest") > .load() > .repartitionByRange( $"offset") > .writeStream > .option("path", testPath + "/data") > .option("checkpointLocation", testPath + "/checkpoint") > .format("parquet") > .trigger(Trigger.ProcessingTime(10.seconds)) > .start() > stream.processAllAvailable() > {code} > This happens because if {{.repartitionByRange( $"offset")}}, if I remove this > line, all good. But with spark create two jobs, one with 1 stage just read > from Kafka, the second with 3 stage read -> shuffle -> write. So the result > of the first job never used. > This has a significant impact on performance. Some of my Kafka topics have > 1550 partitions, so read them twice is a big deal. In case I add cache, > things going better, but this is not a way for me. In local mode, the first > job in batch takes less than 0.1 ms, except batch with index 0. But in YARN > cluster and Messos both jobs fully expected and on my topics take near 1.2 > min. > > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org