[ https://issues.apache.org/jira/browse/SPARK-21443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16090242#comment-16090242 ]
Kazuaki Ishizaki commented on SPARK-21443: ------------------------------------------ These two optimizations {{InferFiltersFromConstraints}} and {{PruneFiltersare}} known as time-consuming optimizations. Since It is not easy to fix to fix the root cause, Spark community introduced an option {{spark.sql.constraintPropagation.enabled}} to disable these optimization by [this PR|https://github.com/apache/spark/pull/17186]. Is it possible to alleviate the problem by using this option? > Very long planning duration for queries with lots of operations > --------------------------------------------------------------- > > Key: SPARK-21443 > URL: https://issues.apache.org/jira/browse/SPARK-21443 > Project: Spark > Issue Type: Bug > Components: SQL, Structured Streaming > Affects Versions: 2.2.0 > Reporter: Eyal Zituny > > Creating a streaming query with large amount of operations and fields (100+) > results in a very long query planning phase. in the example bellow, the plan > phase has taken 35 seconds while the actual batch execution took only 1.3 > second. > after some investigation, i have found out that the root causes of this are 2 > optimizer rules which seems to take most of the planning time: > InferFiltersFromConstraints and PruneFilters > I would suggest the following: > # fix the inefficient optimizer rules > # add warn level logging if a rule has taken more then xx ms > # allow custom removing of optimizer rules (opposite to > spark.experimental.extraOptimizations) > # reuse query plans (optional) where possible > reproducing this issue can be done with the bellow script which simulates the > scenario: > {code:java} > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.execution.streaming.MemoryStream > import > org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent, > QueryStartedEvent, QueryTerminatedEvent} > import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQueryListener} > case class Product(pid: Long, name: String, price: Long, ts: Long = > System.currentTimeMillis()) > case class Events (eventId: Long, eventName: String, productId: Long) { > def this(id: Long) = this(id, s"event$id", id%100) > } > object SparkTestFlow { > def main(args: Array[String]): Unit = { > val spark = SparkSession > .builder > .appName("TestFlow") > .master("local[8]") > .getOrCreate() > spark.sqlContext.streams.addListener(new StreamingQueryListener > { > override def onQueryTerminated(event: > QueryTerminatedEvent): Unit = {} > override def onQueryProgress(event: > QueryProgressEvent): Unit = { > if (event.progress.numInputRows>0) { > println(event.progress.toString()) > } > } > override def onQueryStarted(event: QueryStartedEvent): > Unit = {} > }) > > import spark.implicits._ > implicit val sclContext = spark.sqlContext > import org.apache.spark.sql.functions.expr > val seq = (1L to 100L).map(i => Product(i, s"name$i", 10L*i)) > val lookupTable = spark.createDataFrame(seq) > val inputData = MemoryStream[Events] > inputData.addData((1L to 100L).map(i => new Events(i))) > val events = inputData.toDF() > .withColumn("w1", expr("0")) > .withColumn("x1", expr("0")) > .withColumn("y1", expr("0")) > .withColumn("z1", expr("0")) > val numberOfSelects = 40 // set to 100+ and the planning takes > forever > val dfWithSelectsExpr = (2 to > numberOfSelects).foldLeft(events)((df,i) =>{ > val arr = df.columns.++(Array(s"w${i-1} + rand() as > w$i", s"x${i-1} + rand() as x$i", s"y${i-1} + 2 as y$i", s"z${i-1} +1 as > z$i")) > df.selectExpr(arr:_*) > }) > val withJoinAndFilter = dfWithSelectsExpr > .join(lookupTable, expr("productId = pid")) > .filter("productId < 50") > val query = withJoinAndFilter.writeStream > .outputMode("append") > .format("console") > .trigger(ProcessingTime(2000)) > .start() > query.processAllAvailable() > spark.stop() > } > } > {code} > the query progress output will show: > {code:java} > "durationMs" : { > "addBatch" : 1310, > "getBatch" : 6, > "getOffset" : 0, > "*queryPlanning*" : 36924, > "triggerExecution" : 38297, > "walCommit" : 33 > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org