Pushing down across mapping would be great. If you're used to SQL or
work frequently with lazy collections this is a behavior you learn to
expect.

On 08/02/2016 02:12 PM, Sun Rui wrote:
> Spark does optimise subsequent limits, for example:
>
>     scala> df1.limit(3).limit(1).explain
>     == Physical Plan ==
>     CollectLimit 1
>     +- *SerializeFromObject [assertnotnull(input[0,
>     $line14.$read$$iw$$iw$my, true], top level non-flat input
>     object).x AS x#2]
>        +- Scan ExternalRDDScan[obj#1]
>
> However, limit can not be simply pushes down across mapping functions,
> because the number of rows may change across functions. for example,
> flatMap()
>
> It seems that limit can be pushed across map() which won’t change the
> number of rows. Maybe this is a room for Spark optimisation.
>
>> On Aug 2, 2016, at 18:51, Maciej Szymkiewicz <mszymkiew...@gmail.com
>> <mailto:mszymkiew...@gmail.com>> wrote:
>>
>> Thank you for your prompt response and great examples Sun Rui but I am
>> still confused about one thing. Do you see any particular reason to not
>> to merge subsequent limits? Following case
>>
>>    (limit n (map f (limit m ds)))
>>
>> could be optimized to:
>>
>>    (map f (limit n (limit m ds)))
>>
>> and further to
>>
>>    (map f (limit (min n m) ds))
>>
>> couldn't it?
>>
>>
>> On 08/02/2016 11:57 AM, Sun Rui wrote:
>>> Based on your code, here is simpler test case on Spark 2.0
>>>
>>>    case class my (x: Int)
>>>    val rdd = sc.parallelize(0.until(10000), 1000).map { x => my(x) }
>>>    val df1 = spark.createDataFrame(rdd)
>>>    val df2 = df1.limit(1)
>>>    df1.map { r => r.getAs[Int](0) }.first
>>>    df2.map { r => r.getAs[Int](0) }.first // Much slower than the
>>>    previous line
>>>
>>> Actually, Dataset.first is equivalent to Dataset.limit(1).collect, so
>>> check the physical plan of the two cases:
>>>
>>>    scala> df1.map { r => r.getAs[Int](0) }.limit(1).explain
>>>    == Physical Plan ==
>>>    CollectLimit 1
>>>    +- *SerializeFromObject [input[0, int, true] AS value#124]
>>>       +- *MapElements <function1>, obj#123: int
>>>          +- *DeserializeToObject createexternalrow(x#74,
>>>    StructField(x,IntegerType,false)), obj#122: org.apache.spark.sql.Row
>>>             +- Scan ExistingRDD[x#74]
>>>
>>>    scala> df2.map { r => r.getAs[Int](0) }.limit(1).explain
>>>    == Physical Plan ==
>>>    CollectLimit 1
>>>    +- *SerializeFromObject [input[0, int, true] AS value#131]
>>>       +- *MapElements <function1>, obj#130: int
>>>          +- *DeserializeToObject createexternalrow(x#74,
>>>    StructField(x,IntegerType,false)), obj#129: org.apache.spark.sql.Row
>>>             +- *GlobalLimit 1
>>>                +- Exchange SinglePartition
>>>                   +- *LocalLimit 1
>>>                      +- Scan ExistingRDD[x#74]
>>>
>>>
>>> For the first case, it is related to an optimisation in
>>> the CollectLimitExec physical operator. That is, it will first fetch
>>> the first partition to get limit number of row, 1 in this case, if not
>>> satisfied, then fetch more partitions, until the desired limit is
>>> reached. So generally, if the first partition is not empty, only the
>>> first partition will be calculated and fetched. Other partitions will
>>> even not be computed.
>>>
>>> However, in the second case, the optimisation in the CollectLimitExec
>>> does not help, because the previous limit operation involves a shuffle
>>> operation. All partitions will be computed, and running LocalLimit(1)
>>> on each partition to get 1 row, and then all partitions are shuffled
>>> into a single partition. CollectLimitExec will fetch 1 row from the
>>> resulted single partition.
>>>
>>>
>>>> On Aug 2, 2016, at 09:08, Maciej Szymkiewicz
>>>> <mszymkiew...@gmail.com <mailto:mszymkiew...@gmail.com>
>>>> <mailto:mszymkiew...@gmail.com>> wrote:
>>>>
>>>> Hi everyone,
>>>>
>>>> This doesn't look like something expected, does it?
>>>>
>>>> http://stackoverflow.com/q/38710018/1560062
>>>>
>>>> Quick glance at the UI suggest that there is a shuffle involved and
>>>> input for first is ShuffledRowRDD.
>>>> -- 
>>>> Best regards,
>>>> Maciej Szymkiewicz
>>>
>>
>> -- 
>> Maciej Szymkiewicz
>

-- 
Maciej Szymkiewicz

Reply via email to