But doExecute is not called? On Mon, Apr 18, 2016 at 10:32 PM, Zhan Zhang <zzh...@hortonworks.com> wrote:
> Hi Reynold, > > I just check the code for CollectLimit, there is a shuffle happening to > collect them in one partition. > > protected override def doExecute(): RDD[InternalRow] = { > val shuffled = new ShuffledRowRDD( > ShuffleExchange.prepareShuffleDependency( > child.execute(), child.output, SinglePartition, serializer)) > shuffled.mapPartitionsInternal(_.take(limit)) > } > > Thus, there is no way to avoid processing all data before the shuffle. I > think that is the reason. Do I understand correctly? > > Thanks. > > Zhan Zhang > On Apr 18, 2016, at 10:08 PM, Reynold Xin <r...@databricks.com> wrote: > > Unless I'm really missing something I don't think so. As I said, it goes > through an iterator and after processing each stream side we do a > shouldStop check. The generated code looks like > > /* 094 */ protected void processNext() throws java.io.IOException { > /* 095 */ /*** PRODUCE: Project [id#79L] */ > /* 096 */ > /* 097 */ /*** PRODUCE: BroadcastHashJoin [id#79L], [id#82L], Inner, > BuildRight, None */ > /* 098 */ > /* 099 */ /*** PRODUCE: Range 0, 1, 8, 100, [id#79L] */ > /* 100 */ > /* 101 */ // initialize Range > /* 102 */ if (!range_initRange) { > /* 103 */ range_initRange = true; > /* 104 */ initRange(partitionIndex); > /* 105 */ } > /* 106 */ > /* 107 */ while (!range_overflow && range_number < range_partitionEnd) > { > /* 108 */ long range_value = range_number; > /* 109 */ range_number += 1L; > /* 110 */ if (range_number < range_value ^ 1L < 0) { > /* 111 */ range_overflow = true; > /* 112 */ } > /* 113 */ > /* 114 */ /*** CONSUME: BroadcastHashJoin [id#79L], [id#82L], Inner, > BuildRight, None */ > /* 115 */ > /* 116 */ // generate join key for stream side > /* 117 */ > /* 118 */ // find matches from HashedRelation > /* 119 */ UnsafeRow bhj_matched = false ? null: > (UnsafeRow)bhj_relation.getValue(range_value); > /* 120 */ if (bhj_matched == null) continue; > /* 121 */ > /* 122 */ bhj_metricValue.add(1); > /* 123 */ > /* 124 */ /*** CONSUME: Project [id#79L] */ > /* 125 */ > /* 126 */ System.out.println("i got one row"); > /* 127 */ > /* 128 */ /*** CONSUME: WholeStageCodegen */ > /* 129 */ > /* 130 */ project_rowWriter.write(0, range_value); > /* 131 */ append(project_result); > /* 132 */ > */* 133 */ if (shouldStop()) return;* > /* 134 */ } > /* 135 */ } > /* 136 */ } > > > shouldStop is false once we go pass the limit. > > > > On Mon, Apr 18, 2016 at 9:44 PM, Zhan Zhang <zzh...@hortonworks.com> > wrote: > >> From the physical plan, the limit is one level up than the >> WholeStageCodegen, Thus, I don’t think shouldStop would work here. To move >> it work, the limit has to be part of the wholeStageCodeGen. >> >> Correct me if I am wrong. >> >> Thanks. >> >> Zhan Zhang >> >> On Apr 18, 2016, at 11:09 AM, Reynold Xin <r...@databricks.com> wrote: >> >> I could be wrong but I think we currently do that through whole stage >> codegen. After processing every row on the stream side, the generated code >> for broadcast join checks whether it has hit the limit or not (through this >> thing called shouldStop). >> >> It is not the most optimal solution, because a single stream side row >> might output multiple hits, but it is usually not a problem. >> >> >> On Mon, Apr 18, 2016 at 10:46 AM, Andrew Ray <ray.and...@gmail.com> >> wrote: >> >>> While you can't automatically push the limit *through* the join, we >>> could push it *into* the join (stop processing after generating 10 >>> records). I believe that is what Rajesh is suggesting. >>> >>> On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier < >>> hvanhov...@questtec.nl> wrote: >>> >>>> I am not sure if you can push a limit through a join. This becomes >>>> problematic if not all keys are present on both sides; in such a case a >>>> limit can produce fewer rows than the set limit. >>>> >>>> This might be a rare case in which whole stage codegen is slower, due >>>> to the fact that we need to buffer the result of such a stage. You could >>>> try to disable it by setting "spark.sql.codegen.wholeStage" to false. >>>> >>>> 2016-04-12 14:32 GMT+02:00 Rajesh Balamohan <rajesh.balamo...@gmail.com >>>> >: >>>> >>>>> Hi, >>>>> >>>>> I ran the following query in spark (latest master codebase) and it >>>>> took a lot of time to complete even though it was a broadcast hash join. >>>>> >>>>> It appears that limit computation is done only after computing >>>>> complete join condition. Shouldn't the limit condition be pushed to >>>>> BroadcastHashJoin (wherein it would have to stop processing after >>>>> generating 10 rows?). Please let me know if my understanding on this is >>>>> wrong. >>>>> >>>>> >>>>> select l_partkey from lineitem, partsupp where ps_partkey=l_partkey >>>>> limit 10; >>>>> >>>>> >>>> >>>>> | == Physical Plan == >>>>> CollectLimit 10 >>>>> +- WholeStageCodegen >>>>> : +- Project [l_partkey#893] >>>>> : +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908], >>>>> Inner, BuildRight, None >>>>> : :- Project [l_partkey#893] >>>>> : : +- Filter isnotnull(l_partkey#893) >>>>> : : +- Scan HadoopFiles[l_partkey#893] Format: ORC, >>>>> PushedFilters: [IsNotNull(l_partkey)], ReadSchema: struct<l_partkey:int> >>>>> : +- INPUT >>>>> +- BroadcastExchange >>>>> HashedRelationBroadcastMode(true,List(cast(ps_partkey#908 as >>>>> bigint)),List(ps_partkey#908)) >>>>> +- WholeStageCodegen >>>>> : +- Project [ps_partkey#908] >>>>> : +- Filter isnotnull(ps_partkey#908) >>>>> : +- Scan HadoopFiles[ps_partkey#908] Format: ORC, >>>>> PushedFilters: [IsNotNull(ps_partkey)], ReadSchema: struct<ps_partkey:int> >>>>> | >>>>> >>>> >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> ~Rajesh.B >>>>> >>>> >>>> >>> >> >> > >