Eyal Zituny created SPARK-21443:
-----------------------------------

             Summary: Very long planning duration for queries with lots of 
operations
                 Key: SPARK-21443
                 URL: https://issues.apache.org/jira/browse/SPARK-21443
             Project: Spark
          Issue Type: Bug
          Components: SQL, Structured Streaming
    Affects Versions: 2.2.0
            Reporter: Eyal Zituny


Creating a streaming query with large amount of operations and fields (100+) 
results in a very long query planning phase. in the example bellow, the plan 
phase has taken 35 seconds while the actual batch execution took only 1.3 
second.
after some investigation, i have found out that the root causes of this are 2 
optimizer rules which seems to take most of the planning time: 
InferFiltersFromConstraints and PruneFilters

I would suggest the following:
# fix the inefficient optimizer rules
# add warn level logging if a rule has taken more then xx ms
# allow custom removing of optimizer rules (opposite to 
spark.experimental.extraOptimizations)
# reuse query plans (optional) where possible

reproducing this issue can be done with the bellow script which simulates the 
scenario:


{code:java}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.MemoryStream
import 
org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent, 
QueryStartedEvent, QueryTerminatedEvent}
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQueryListener}

case class Product(pid: Long, name: String, price: Long, ts: Long = 
System.currentTimeMillis())

case class Events (eventId: Long, eventName: String, productId: Long) {
        def this(id: Long) = this(id, s"event$id", id%100)
}

object SparkTestFlow {
        def main(args: Array[String]): Unit = {
                val spark = SparkSession
                  .builder
                  .appName("TestFlow")
                  .master("local[8]")
                  .getOrCreate()

                spark.sqlContext.streams.addListener(new StreamingQueryListener 
{
                        override def onQueryTerminated(event: 
QueryTerminatedEvent): Unit = {}
                        override def onQueryProgress(event: 
QueryProgressEvent): Unit = {
                                if (event.progress.numInputRows>0) {
                                        println(event.progress.toString())
                                }
                        }
                        override def onQueryStarted(event: QueryStartedEvent): 
Unit = {}
                })
                
                import spark.implicits._
                implicit val  sclContext = spark.sqlContext
                import org.apache.spark.sql.functions.expr

                val seq = (1L to 100L).map(i => Product(i, s"name$i", 10L*i))
                val lookupTable = spark.createDataFrame(seq)

                val inputData = MemoryStream[Events]
                inputData.addData((1L to 100L).map(i => new Events(i)))

                val events = inputData.toDF()
                  .withColumn("w1", expr("0"))
                  .withColumn("x1", expr("0"))
                  .withColumn("y1", expr("0"))
                  .withColumn("z1", expr("0"))

                val numberOfSelects = 40 // set to 100+ and the planning takes 
forever
                val dfWithSelectsExpr = (2 to 
numberOfSelects).foldLeft(events)((df,i) =>{
                        val arr = df.columns.++(Array(s"w${i-1} + rand() as 
w$i", s"x${i-1} + rand() as x$i", s"y${i-1} + 2 as y$i", s"z${i-1} +1 as z$i"))
                        df.selectExpr(arr:_*)
                })

                val withJoinAndFilter = dfWithSelectsExpr
                  .join(lookupTable, expr("productId = pid"))
                  .filter("productId < 50")

                val query = withJoinAndFilter.writeStream
                  .outputMode("append")
                  .format("console")
                  .trigger(ProcessingTime(2000))
                  .start()

                query.processAllAvailable()
                spark.stop()
        }
}
{code}


the query progress output will show: 

{code:java}
"durationMs" : {
    "addBatch" : 1310,
    "getBatch" : 6,
    "getOffset" : 0,
    "*queryPlanning*" : 36924,
    "triggerExecution" : 38297,
    "walCommit" : 33
  }
{code}




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to