But doExecute is not called?
On Mon, Apr 18, 2016 at 10:32 PM, Zhan Zhang <[email protected]> wrote:
> Hi Reynold,
>
> I just check the code for CollectLimit, there is a shuffle happening to
> collect them in one partition.
>
> protected override def doExecute(): RDD[InternalRow] = {
> val shuffled = new ShuffledRowRDD(
> ShuffleExchange.prepareShuffleDependency(
> child.execute(), child.output, SinglePartition, serializer))
> shuffled.mapPartitionsInternal(_.take(limit))
> }
>
> Thus, there is no way to avoid processing all data before the shuffle. I
> think that is the reason. Do I understand correctly?
>
> Thanks.
>
> Zhan Zhang
> On Apr 18, 2016, at 10:08 PM, Reynold Xin <[email protected]> wrote:
>
> Unless I'm really missing something I don't think so. As I said, it goes
> through an iterator and after processing each stream side we do a
> shouldStop check. The generated code looks like
>
> /* 094 */ protected void processNext() throws java.io.IOException {
> /* 095 */ /*** PRODUCE: Project [id#79L] */
> /* 096 */
> /* 097 */ /*** PRODUCE: BroadcastHashJoin [id#79L], [id#82L], Inner,
> BuildRight, None */
> /* 098 */
> /* 099 */ /*** PRODUCE: Range 0, 1, 8, 100, [id#79L] */
> /* 100 */
> /* 101 */ // initialize Range
> /* 102 */ if (!range_initRange) {
> /* 103 */ range_initRange = true;
> /* 104 */ initRange(partitionIndex);
> /* 105 */ }
> /* 106 */
> /* 107 */ while (!range_overflow && range_number < range_partitionEnd)
> {
> /* 108 */ long range_value = range_number;
> /* 109 */ range_number += 1L;
> /* 110 */ if (range_number < range_value ^ 1L < 0) {
> /* 111 */ range_overflow = true;
> /* 112 */ }
> /* 113 */
> /* 114 */ /*** CONSUME: BroadcastHashJoin [id#79L], [id#82L], Inner,
> BuildRight, None */
> /* 115 */
> /* 116 */ // generate join key for stream side
> /* 117 */
> /* 118 */ // find matches from HashedRelation
> /* 119 */ UnsafeRow bhj_matched = false ? null:
> (UnsafeRow)bhj_relation.getValue(range_value);
> /* 120 */ if (bhj_matched == null) continue;
> /* 121 */
> /* 122 */ bhj_metricValue.add(1);
> /* 123 */
> /* 124 */ /*** CONSUME: Project [id#79L] */
> /* 125 */
> /* 126 */ System.out.println("i got one row");
> /* 127 */
> /* 128 */ /*** CONSUME: WholeStageCodegen */
> /* 129 */
> /* 130 */ project_rowWriter.write(0, range_value);
> /* 131 */ append(project_result);
> /* 132 */
> */* 133 */ if (shouldStop()) return;*
> /* 134 */ }
> /* 135 */ }
> /* 136 */ }
>
>
> shouldStop is false once we go pass the limit.
>
>
>
> On Mon, Apr 18, 2016 at 9:44 PM, Zhan Zhang <[email protected]>
> wrote:
>
>> From the physical plan, the limit is one level up than the
>> WholeStageCodegen, Thus, I don’t think shouldStop would work here. To move
>> it work, the limit has to be part of the wholeStageCodeGen.
>>
>> Correct me if I am wrong.
>>
>> Thanks.
>>
>> Zhan Zhang
>>
>> On Apr 18, 2016, at 11:09 AM, Reynold Xin <[email protected]> wrote:
>>
>> I could be wrong but I think we currently do that through whole stage
>> codegen. After processing every row on the stream side, the generated code
>> for broadcast join checks whether it has hit the limit or not (through this
>> thing called shouldStop).
>>
>> It is not the most optimal solution, because a single stream side row
>> might output multiple hits, but it is usually not a problem.
>>
>>
>> On Mon, Apr 18, 2016 at 10:46 AM, Andrew Ray <[email protected]>
>> wrote:
>>
>>> While you can't automatically push the limit *through* the join, we
>>> could push it *into* the join (stop processing after generating 10
>>> records). I believe that is what Rajesh is suggesting.
>>>
>>> On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier <
>>> [email protected]> wrote:
>>>
>>>> I am not sure if you can push a limit through a join. This becomes
>>>> problematic if not all keys are present on both sides; in such a case a
>>>> limit can produce fewer rows than the set limit.
>>>>
>>>> This might be a rare case in which whole stage codegen is slower, due
>>>> to the fact that we need to buffer the result of such a stage. You could
>>>> try to disable it by setting "spark.sql.codegen.wholeStage" to false.
>>>>
>>>> 2016-04-12 14:32 GMT+02:00 Rajesh Balamohan <[email protected]
>>>> >:
>>>>
>>>>> Hi,
>>>>>
>>>>> I ran the following query in spark (latest master codebase) and it
>>>>> took a lot of time to complete even though it was a broadcast hash join.
>>>>>
>>>>> It appears that limit computation is done only after computing
>>>>> complete join condition. Shouldn't the limit condition be pushed to
>>>>> BroadcastHashJoin (wherein it would have to stop processing after
>>>>> generating 10 rows?). Please let me know if my understanding on this is
>>>>> wrong.
>>>>>
>>>>>
>>>>> select l_partkey from lineitem, partsupp where ps_partkey=l_partkey
>>>>> limit 10;
>>>>>
>>>>> >>>>
>>>>> | == Physical Plan ==
>>>>> CollectLimit 10
>>>>> +- WholeStageCodegen
>>>>> : +- Project [l_partkey#893]
>>>>> : +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908],
>>>>> Inner, BuildRight, None
>>>>> : :- Project [l_partkey#893]
>>>>> : : +- Filter isnotnull(l_partkey#893)
>>>>> : : +- Scan HadoopFiles[l_partkey#893] Format: ORC,
>>>>> PushedFilters: [IsNotNull(l_partkey)], ReadSchema: struct<l_partkey:int>
>>>>> : +- INPUT
>>>>> +- BroadcastExchange
>>>>> HashedRelationBroadcastMode(true,List(cast(ps_partkey#908 as
>>>>> bigint)),List(ps_partkey#908))
>>>>> +- WholeStageCodegen
>>>>> : +- Project [ps_partkey#908]
>>>>> : +- Filter isnotnull(ps_partkey#908)
>>>>> : +- Scan HadoopFiles[ps_partkey#908] Format: ORC,
>>>>> PushedFilters: [IsNotNull(ps_partkey)], ReadSchema: struct<ps_partkey:int>
>>>>> |
>>>>> >>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> ~Rajesh.B
>>>>>
>>>>
>>>>
>>>
>>
>>
>
>