But doExecute is not called?

On Mon, Apr 18, 2016 at 10:32 PM, Zhan Zhang <zzh...@hortonworks.com> wrote:

> Hi Reynold,
>
> I just check the code for CollectLimit, there is a shuffle happening to
> collect them in one partition.
>
> protected override def doExecute(): RDD[InternalRow] = {
>   val shuffled = new ShuffledRowRDD(
>     ShuffleExchange.prepareShuffleDependency(
>       child.execute(), child.output, SinglePartition, serializer))
>   shuffled.mapPartitionsInternal(_.take(limit))
> }
>
> Thus, there is no way to avoid processing all data before the shuffle. I
> think that is the reason. Do I understand correctly?
>
> Thanks.
>
> Zhan Zhang
> On Apr 18, 2016, at 10:08 PM, Reynold Xin <r...@databricks.com> wrote:
>
> Unless I'm really missing something I don't think so. As I said, it goes
> through an iterator and after processing each stream side we do a
> shouldStop check. The generated code looks like
>
> /* 094 */   protected void processNext() throws java.io.IOException {
> /* 095 */     /*** PRODUCE: Project [id#79L] */
> /* 096 */
> /* 097 */     /*** PRODUCE: BroadcastHashJoin [id#79L], [id#82L], Inner,
> BuildRight, None */
> /* 098 */
> /* 099 */     /*** PRODUCE: Range 0, 1, 8, 100, [id#79L] */
> /* 100 */
> /* 101 */     // initialize Range
> /* 102 */     if (!range_initRange) {
> /* 103 */       range_initRange = true;
> /* 104 */       initRange(partitionIndex);
> /* 105 */     }
> /* 106 */
> /* 107 */     while (!range_overflow && range_number < range_partitionEnd)
> {
> /* 108 */       long range_value = range_number;
> /* 109 */       range_number += 1L;
> /* 110 */       if (range_number < range_value ^ 1L < 0) {
> /* 111 */         range_overflow = true;
> /* 112 */       }
> /* 113 */
> /* 114 */       /*** CONSUME: BroadcastHashJoin [id#79L], [id#82L], Inner,
> BuildRight, None */
> /* 115 */
> /* 116 */       // generate join key for stream side
> /* 117 */
> /* 118 */       // find matches from HashedRelation
> /* 119 */       UnsafeRow bhj_matched = false ? null:
> (UnsafeRow)bhj_relation.getValue(range_value);
> /* 120 */       if (bhj_matched == null) continue;
> /* 121 */
> /* 122 */       bhj_metricValue.add(1);
> /* 123 */
> /* 124 */       /*** CONSUME: Project [id#79L] */
> /* 125 */
> /* 126 */       System.out.println("i got one row");
> /* 127 */
> /* 128 */       /*** CONSUME: WholeStageCodegen */
> /* 129 */
> /* 130 */       project_rowWriter.write(0, range_value);
> /* 131 */       append(project_result);
> /* 132 */
> */* 133 */       if (shouldStop()) return;*
> /* 134 */     }
> /* 135 */   }
> /* 136 */ }
>
>
> shouldStop is false once we go pass the limit.
>
>
>
> On Mon, Apr 18, 2016 at 9:44 PM, Zhan Zhang <zzh...@hortonworks.com>
> wrote:
>
>> From the physical plan, the limit is one level up than the
>> WholeStageCodegen, Thus, I don’t think shouldStop would work here. To move
>> it work, the limit has to be part of the wholeStageCodeGen.
>>
>> Correct me if I am wrong.
>>
>> Thanks.
>>
>> Zhan Zhang
>>
>> On Apr 18, 2016, at 11:09 AM, Reynold Xin <r...@databricks.com> wrote:
>>
>> I could be wrong but I think we currently do that through whole stage
>> codegen. After processing every row on the stream side, the generated code
>> for broadcast join checks whether it has hit the limit or not (through this
>> thing called shouldStop).
>>
>> It is not the most optimal solution, because a single stream side row
>> might output multiple hits, but it is usually not a problem.
>>
>>
>> On Mon, Apr 18, 2016 at 10:46 AM, Andrew Ray <ray.and...@gmail.com>
>> wrote:
>>
>>> While you can't automatically push the limit *through* the join, we
>>> could push it *into* the join (stop processing after generating 10
>>> records). I believe that is what Rajesh is suggesting.
>>>
>>> On Tue, Apr 12, 2016 at 7:46 AM, Herman van Hövell tot Westerflier <
>>> hvanhov...@questtec.nl> wrote:
>>>
>>>> I am not sure if you can push a limit through a join. This becomes
>>>> problematic if not all keys are present on both sides; in such a case a
>>>> limit can produce fewer rows than the set limit.
>>>>
>>>> This might be a rare case in which whole stage codegen is slower, due
>>>> to the fact that we need to buffer the result of such a stage. You could
>>>> try to disable it by setting "spark.sql.codegen.wholeStage" to false.
>>>>
>>>> 2016-04-12 14:32 GMT+02:00 Rajesh Balamohan <rajesh.balamo...@gmail.com
>>>> >:
>>>>
>>>>> Hi,
>>>>>
>>>>> I ran the following query in spark (latest master codebase) and it
>>>>> took a lot of time to complete even though it was a broadcast hash join.
>>>>>
>>>>> It appears that limit computation is done only after computing
>>>>> complete join condition.  Shouldn't the limit condition be pushed to
>>>>> BroadcastHashJoin (wherein it would have to stop processing after
>>>>> generating 10 rows?).  Please let me know if my understanding on this is
>>>>> wrong.
>>>>>
>>>>>
>>>>> select l_partkey from lineitem, partsupp where ps_partkey=l_partkey
>>>>> limit 10;
>>>>>
>>>>> >>>>
>>>>> | == Physical Plan ==
>>>>> CollectLimit 10
>>>>> +- WholeStageCodegen
>>>>>    :  +- Project [l_partkey#893]
>>>>>    :     +- BroadcastHashJoin [l_partkey#893], [ps_partkey#908],
>>>>> Inner, BuildRight, None
>>>>>    :        :- Project [l_partkey#893]
>>>>>    :        :  +- Filter isnotnull(l_partkey#893)
>>>>>    :        :     +- Scan HadoopFiles[l_partkey#893] Format: ORC,
>>>>> PushedFilters: [IsNotNull(l_partkey)], ReadSchema: struct<l_partkey:int>
>>>>>    :        +- INPUT
>>>>>    +- BroadcastExchange
>>>>> HashedRelationBroadcastMode(true,List(cast(ps_partkey#908 as
>>>>> bigint)),List(ps_partkey#908))
>>>>>       +- WholeStageCodegen
>>>>>          :  +- Project [ps_partkey#908]
>>>>>          :     +- Filter isnotnull(ps_partkey#908)
>>>>>          :        +- Scan HadoopFiles[ps_partkey#908] Format: ORC,
>>>>> PushedFilters: [IsNotNull(ps_partkey)], ReadSchema: struct<ps_partkey:int>
>>>>>  |
>>>>> >>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> ~Rajesh.B
>>>>>
>>>>
>>>>
>>>
>>
>>
>
>

Reply via email to