thanks Michael,

That worked.
But what's puzzling is if I take the exact same code and run it off a temp
table created from parquet, vs. a cached table - it runs much slower.  5-10
seconds uncached vs. 47-60 seconds cached.

Any ideas why?

Here's my code snippet:
df = data.select("customer_id", struct('dt', 'product').alias("vs"))\
  .groupBy("customer_id")\
  .agg(min("vs").alias("final"))\
  .select("customer_id", "final.dt", "final.product")
df.head()

My log from the non-cached run:
http://pastebin.com/F88sSv1B

Log from the cached run:
http://pastebin.com/Pmmfea3d

thanks,
imran

On Fri, Apr 8, 2016 at 12:33 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> You need to use the struct function
> <https://spark.apache.org/docs/1.5.2/api/python/pyspark.sql.html#pyspark.sql.functions.struct>
> (which creates an actual struct), you are trying to use the struct datatype
> (which just represents the schema of a struct).
>
> On Thu, Apr 7, 2016 at 3:48 PM, Imran Akbar <skunkw...@gmail.com> wrote:
>
>> thanks Michael,
>>
>>
>> I'm trying to implement the code in pyspark like so (where my dataframe
>> has 3 columns - customer_id, dt, and product):
>>
>> st = StructType().add("dt", DateType(), True).add("product",
>> StringType(), True)
>>
>> top = data.select("customer_id", st.alias('vs'))
>>   .groupBy("customer_id")
>>   .agg(max("dt").alias("vs"))
>>   .select("customer_id", "vs.dt", "vs.product")
>>
>> But I get an error saying:
>>
>> AttributeError: 'StructType' object has no attribute 'alias'
>>
>> Can I do this without aliasing the struct?  Or am I doing something
>> incorrectly?
>>
>>
>> regards,
>>
>> imran
>>
>> On Wed, Apr 6, 2016 at 4:16 PM, Michael Armbrust <mich...@databricks.com>
>> wrote:
>>
>>> Ordering for a struct goes in order of the fields.  So the max struct is
>>>> the one with the highest TotalValue (and then the highest category
>>>>       if there are multiple entries with the same hour and total value).
>>>>
>>>> Is this due to "InterpretedOrdering" in StructType?
>>>>
>>>
>>> That is one implementation, but the code generated ordering also follows
>>> the same contract.
>>>
>>>
>>>
>>>>  4)  Is it faster doing it this way than doing a join or window
>>>> function in Spark SQL?
>>>>
>>>> Way faster.  This is a very efficient way to calculate argmax.
>>>>
>>>> Can you explain how this is way faster than window function? I can
>>>> understand join doesn't make sense in this case. But to calculate the
>>>> grouping max, you just have to shuffle the data by grouping keys. You maybe
>>>> can do a combiner on the mapper side before shuffling, but that is it. Do
>>>> you mean windowing function in Spark SQL won't do any map side combiner,
>>>> even it is for max?
>>>>
>>>
>>> Windowing can't do partial aggregation and will have to collect all the
>>> data for a group so that it can be sorted before applying the function.  In
>>> contrast a max aggregation will do partial aggregation (map side combining)
>>> and can be calculated in a streaming fashion.
>>>
>>> Also, aggregation is more common and thus has seen more optimization
>>> beyond the theoretical limits described above.
>>>
>>>
>>
>

Reply via email to