Unless I'm really missing something I don't think so. As I said, it goes
through an iterator and after processing each stream side we do a
shouldStop check. The generated code looks like
/* 094 */ protected void processNext() throws java.io.IOException {
/* 095 */ /*** PRODUCE: Project [id#79L] */
/* 096 */
/* 097 */ /*** PRODUCE: BroadcastHashJoin [id#79L], [id#82L], Inner,
BuildRight, None */
/* 098 */
/* 099 */ /*** PRODUCE: Range 0, 1, 8, 100, [id#79L] */
/* 100 */
/* 101 */ // initialize Range
/* 102 */ if (!range_initRange) {
/* 103 */ range_initRange = true;
/* 104 */ initRange(partitionIndex);
/* 105 */ }
/* 106 */
/* 107 */ while (!range_overflow && range_number < range_partitionEnd) {
/* 108 */ long range_value = range_number;
/* 109 */ range_number += 1L;
/* 110 */ if (range_number < range_value ^ 1L < 0) {
/* 111 */ range_overflow = true;
/* 112 */ }
/* 113 */
/* 114 */ /*** CONSUME: BroadcastHashJoin [id#79L], [id#82L], Inner,
BuildRight, None */
/* 115 */
/* 116 */ // generate join key for stream side
/* 117 */
/* 118 */ // find matches from HashedRelation
/* 119 */ UnsafeRow bhj_matched = false ? null:
(UnsafeRow)bhj_relation.getValue(range_value);
/* 120 */ if (bhj_matched == null) continue;
/* 121 */
/* 122 */ bhj_metricValue.add(1);
/* 123 */
/* 124 */ /*** CONSUME: Project [id#79L] */
/* 125 */
/* 126 */ System.out.println("i got one row");
/* 127 */
/* 128 */ /*** CONSUME: WholeStageCodegen */
/* 129 */
/* 130 */ project_rowWriter.write(0, range_value);
/* 131 */ append(project_result);
/* 132 */
*/* 133 */ if (shouldStop()) return;*
/* 134 */ }
/* 135 */ }
/* 136 */ }
shouldStop is false once we go pass the limit.
On Mon, Apr 18, 2016 at 9:44 PM, Zhan Zhang <[email protected]> wrote:
> From the physical plan, the limit is one level up than the
> WholeStageCodegen, Thus, I don’t think shouldStop would work here. To move
> it work, the limit has to be part of the wholeStageCodeGen.
>
> Correct me if I am wrong.
>
> Thanks.
>
> Zhan Zhang
>
> On Apr 18, 2016, at 11:09 AM, Reynold Xin <[email protected]> wrote:
>
> I could be wrong but I think we currently do that through whole stage
> codegen. After processing every row on the stream side, the generated code
> for broadcast join checks whether it has hit the limit or not (through this
> thing called shouldStop).
>
> It is not the most optimal solution, because a single stream side row
> might output multiple hits, but it is usually not a problem.
>
>
> On Mon, Apr 18, 2016 at 10:46 AM, Andrew Ray <[email protected]> wrote:
>
>> While you can't automatically push the limit *through* the join, we could
>> push it *into* the join (stop processing after generating 10 records). I
>> believe that is what Rajesh is suggesting.
>>
>> On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier <
>> [email protected]> wrote:
>>
>>> I am not sure if you can push a limit through a join. This becomes
>>> problematic if not all keys are present on both sides; in such a case a
>>> limit can produce fewer rows than the set limit.
>>>
>>> This might be a rare case in which whole stage codegen is slower, due to
>>> the fact that we need to buffer the result of such a stage. You could try
>>> to disable it by setting "spark.sql.codegen.wholeStage" to false.
>>>
>>> 2016-04-12 14:32 GMT+02:00 Rajesh Balamohan <[email protected]>
>>> :
>>>
>>>> Hi,
>>>>
>>>> I ran the following query in spark (latest master codebase) and it took
>>>> a lot of time to complete even though it was a broadcast hash join.
>>>>
>>>> It appears that limit computation is done only after computing complete
>>>> join condition. Shouldn't the limit condition be pushed to
>>>> BroadcastHashJoin (wherein it would have to stop processing after
>>>> generating 10 rows?). Please let me know if my understanding on this is
>>>> wrong.
>>>>
>>>>
>>>> select l_partkey from lineitem, partsupp where ps_partkey=l_partkey
>>>> limit 10;
>>>>
>>>> >>>>
>>>> | == Physical Plan ==
>>>> CollectLimit 10
>>>> +- WholeStageCodegen
>>>> : +- Project [l_partkey#893]
>>>> : +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908], Inner,
>>>> BuildRight, None
>>>> : :- Project [l_partkey#893]
>>>> : : +- Filter isnotnull(l_partkey#893)
>>>> : : +- Scan HadoopFiles[l_partkey#893] Format: ORC,
>>>> PushedFilters: [IsNotNull(l_partkey)], ReadSchema: struct<l_partkey:int>
>>>> : +- INPUT
>>>> +- BroadcastExchange
>>>> HashedRelationBroadcastMode(true,List(cast(ps_partkey#908 as
>>>> bigint)),List(ps_partkey#908))
>>>> +- WholeStageCodegen
>>>> : +- Project [ps_partkey#908]
>>>> : +- Filter isnotnull(ps_partkey#908)
>>>> : +- Scan HadoopFiles[ps_partkey#908] Format: ORC,
>>>> PushedFilters: [IsNotNull(ps_partkey)], ReadSchema: struct<ps_partkey:int>
>>>> |
>>>> >>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> ~Rajesh.B
>>>>
>>>
>>>
>>
>
>