Pushing down across mapping would be great. If you're used to SQL or work frequently with lazy collections this is a behavior you learn to expect.
On 08/02/2016 02:12 PM, Sun Rui wrote: > Spark does optimise subsequent limits, for example: > > scala> df1.limit(3).limit(1).explain > == Physical Plan == > CollectLimit 1 > +- *SerializeFromObject [assertnotnull(input[0, > $line14.$read$$iw$$iw$my, true], top level non-flat input > object).x AS x#2] > +- Scan ExternalRDDScan[obj#1] > > However, limit can not be simply pushes down across mapping functions, > because the number of rows may change across functions. for example, > flatMap() > > It seems that limit can be pushed across map() which won’t change the > number of rows. Maybe this is a room for Spark optimisation. > >> On Aug 2, 2016, at 18:51, Maciej Szymkiewicz <mszymkiew...@gmail.com >> <mailto:mszymkiew...@gmail.com>> wrote: >> >> Thank you for your prompt response and great examples Sun Rui but I am >> still confused about one thing. Do you see any particular reason to not >> to merge subsequent limits? Following case >> >> (limit n (map f (limit m ds))) >> >> could be optimized to: >> >> (map f (limit n (limit m ds))) >> >> and further to >> >> (map f (limit (min n m) ds)) >> >> couldn't it? >> >> >> On 08/02/2016 11:57 AM, Sun Rui wrote: >>> Based on your code, here is simpler test case on Spark 2.0 >>> >>> case class my (x: Int) >>> val rdd = sc.parallelize(0.until(10000), 1000).map { x => my(x) } >>> val df1 = spark.createDataFrame(rdd) >>> val df2 = df1.limit(1) >>> df1.map { r => r.getAs[Int](0) }.first >>> df2.map { r => r.getAs[Int](0) }.first // Much slower than the >>> previous line >>> >>> Actually, Dataset.first is equivalent to Dataset.limit(1).collect, so >>> check the physical plan of the two cases: >>> >>> scala> df1.map { r => r.getAs[Int](0) }.limit(1).explain >>> == Physical Plan == >>> CollectLimit 1 >>> +- *SerializeFromObject [input[0, int, true] AS value#124] >>> +- *MapElements <function1>, obj#123: int >>> +- *DeserializeToObject createexternalrow(x#74, >>> StructField(x,IntegerType,false)), obj#122: org.apache.spark.sql.Row >>> +- Scan ExistingRDD[x#74] >>> >>> scala> df2.map { r => r.getAs[Int](0) }.limit(1).explain >>> == Physical Plan == >>> CollectLimit 1 >>> +- *SerializeFromObject [input[0, int, true] AS value#131] >>> +- *MapElements <function1>, obj#130: int >>> +- *DeserializeToObject createexternalrow(x#74, >>> StructField(x,IntegerType,false)), obj#129: org.apache.spark.sql.Row >>> +- *GlobalLimit 1 >>> +- Exchange SinglePartition >>> +- *LocalLimit 1 >>> +- Scan ExistingRDD[x#74] >>> >>> >>> For the first case, it is related to an optimisation in >>> the CollectLimitExec physical operator. That is, it will first fetch >>> the first partition to get limit number of row, 1 in this case, if not >>> satisfied, then fetch more partitions, until the desired limit is >>> reached. So generally, if the first partition is not empty, only the >>> first partition will be calculated and fetched. Other partitions will >>> even not be computed. >>> >>> However, in the second case, the optimisation in the CollectLimitExec >>> does not help, because the previous limit operation involves a shuffle >>> operation. All partitions will be computed, and running LocalLimit(1) >>> on each partition to get 1 row, and then all partitions are shuffled >>> into a single partition. CollectLimitExec will fetch 1 row from the >>> resulted single partition. >>> >>> >>>> On Aug 2, 2016, at 09:08, Maciej Szymkiewicz >>>> <mszymkiew...@gmail.com <mailto:mszymkiew...@gmail.com> >>>> <mailto:mszymkiew...@gmail.com>> wrote: >>>> >>>> Hi everyone, >>>> >>>> This doesn't look like something expected, does it? >>>> >>>> http://stackoverflow.com/q/38710018/1560062 >>>> >>>> Quick glance at the UI suggest that there is a shuffle involved and >>>> input for first is ShuffledRowRDD. >>>> -- >>>> Best regards, >>>> Maciej Szymkiewicz >>> >> >> -- >> Maciej Szymkiewicz > -- Maciej Szymkiewicz