Eyal Zituny created SPARK-21443: ----------------------------------- Summary: Very long planning duration for queries with lots of operations Key: SPARK-21443 URL: https://issues.apache.org/jira/browse/SPARK-21443 Project: Spark Issue Type: Bug Components: SQL, Structured Streaming Affects Versions: 2.2.0 Reporter: Eyal Zituny
Creating a streaming query with large amount of operations and fields (100+) results in a very long query planning phase. in the example bellow, the plan phase has taken 35 seconds while the actual batch execution took only 1.3 second. after some investigation, i have found out that the root causes of this are 2 optimizer rules which seems to take most of the planning time: InferFiltersFromConstraints and PruneFilters I would suggest the following: # fix the inefficient optimizer rules # add warn level logging if a rule has taken more then xx ms # allow custom removing of optimizer rules (opposite to spark.experimental.extraOptimizations) # reuse query plans (optional) where possible reproducing this issue can be done with the bellow script which simulates the scenario: {code:java} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent} import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQueryListener} case class Product(pid: Long, name: String, price: Long, ts: Long = System.currentTimeMillis()) case class Events (eventId: Long, eventName: String, productId: Long) { def this(id: Long) = this(id, s"event$id", id%100) } object SparkTestFlow { def main(args: Array[String]): Unit = { val spark = SparkSession .builder .appName("TestFlow") .master("local[8]") .getOrCreate() spark.sqlContext.streams.addListener(new StreamingQueryListener { override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {} override def onQueryProgress(event: QueryProgressEvent): Unit = { if (event.progress.numInputRows>0) { println(event.progress.toString()) } } override def onQueryStarted(event: QueryStartedEvent): Unit = {} }) import spark.implicits._ implicit val sclContext = spark.sqlContext import org.apache.spark.sql.functions.expr val seq = (1L to 100L).map(i => Product(i, s"name$i", 10L*i)) val lookupTable = spark.createDataFrame(seq) val inputData = MemoryStream[Events] inputData.addData((1L to 100L).map(i => new Events(i))) val events = inputData.toDF() .withColumn("w1", expr("0")) .withColumn("x1", expr("0")) .withColumn("y1", expr("0")) .withColumn("z1", expr("0")) val numberOfSelects = 40 // set to 100+ and the planning takes forever val dfWithSelectsExpr = (2 to numberOfSelects).foldLeft(events)((df,i) =>{ val arr = df.columns.++(Array(s"w${i-1} + rand() as w$i", s"x${i-1} + rand() as x$i", s"y${i-1} + 2 as y$i", s"z${i-1} +1 as z$i")) df.selectExpr(arr:_*) }) val withJoinAndFilter = dfWithSelectsExpr .join(lookupTable, expr("productId = pid")) .filter("productId < 50") val query = withJoinAndFilter.writeStream .outputMode("append") .format("console") .trigger(ProcessingTime(2000)) .start() query.processAllAvailable() spark.stop() } } {code} the query progress output will show: {code:java} "durationMs" : { "addBatch" : 1310, "getBatch" : 6, "getOffset" : 0, "*queryPlanning*" : 36924, "triggerExecution" : 38297, "walCommit" : 33 } {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org