[ https://issues.apache.org/jira/browse/SPARK-31427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Nick Hryhoriev resolved SPARK-31427. ------------------------------------ Resolution: Invalid > Spark Structure streaming read data twice per every micro-batch. > ---------------------------------------------------------------- > > Key: SPARK-31427 > URL: https://issues.apache.org/jira/browse/SPARK-31427 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.4.3, 2.4.7, 3.0.1, 3.0.2 > Reporter: Nick Hryhoriev > Priority: Major > > I have a very strange issue with spark structure streaming. Spark structure > streaming creates two spark jobs for every micro-batch. As a result, read > data from Kafka twice. Here is a simple code snippet. > > {code:java} > import org.apache.hadoop.fs.{FileSystem, Path} > import org.apache.spark.SparkConf > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.streaming.Trigger > object CheckHowSparkReadFromKafka { > def main(args: Array[String]): Unit = { > val session = SparkSession.builder() > .config(new SparkConf() > .setAppName(s"simple read from kafka with repartition") > .setMaster("local[*]") > .set("spark.driver.host", "localhost")) > .getOrCreate() > val testPath = "/tmp/spark-test" > FileSystem.get(session.sparkContext.hadoopConfiguration).delete(new > Path(testPath), true) > import session.implicits._ > val stream = session > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", "kafka-20002-prod:9092") > .option("subscribe", "topic") > .option("maxOffsetsPerTrigger", 1000) > .option("failOnDataLoss", false) > .option("startingOffsets", "latest") > .load() > .repartitionByRange( $"offset") > .writeStream > .option("path", testPath + "/data") > .option("checkpointLocation", testPath + "/checkpoint") > .format("parquet") > .trigger(Trigger.ProcessingTime(10.seconds)) > .start() > stream.processAllAvailable() > {code} > This happens because if {{.repartitionByRange( $"offset")}}, if I remove this > line, all good. But with spark create two jobs, one with 1 stage just read > from Kafka, the second with 3 stage read -> shuffle -> write. So the result > of the first job never used. > This has a significant impact on performance. Some of my Kafka topics have > 1550 partitions, so read them twice is a big deal. In case I add cache, > things going better, but this is not a way for me. In local mode, the first > job in batch takes less than 0.1 ms, except batch with index 0. But in YARN > cluster and Messos both jobs fully expected and on my topics take near 1.2 > min. > > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org