Multiple column aggregations

2019-02-08 Thread Sonu Jyotshna
Hello,

I have a requirement where I need to group by multiple columns and
aggregate them not at same time .. I mean I have a structure which contains
accountid, some cols, order id . I need to calculate some scenarios like
account having multiple orders so group by account and aggregate will work
here but I need to find orderid associated to multiple accounts so may be
group by orderid will work here but for better performance on the dataset
level can we do in single step? Where both will work or any better approach
I can follow . Can you help


Regards,
Sonu


Pyspark elementwise matrix multiplication

2019-02-08 Thread Simon Dirmeier

Dear all,

I wonder if there is a way to take the elementwise-product of 2 matrices 
(RowMatrix, DistributedMatrix, ..) in pyspark?

I cannot find a good answer/API entry on the topic.

Thank you for all the help.


Best,
Simon


Element-wise multiplication in Pyspark

2019-02-08 Thread Simon Dirmeier

Dear all,

is there a way to take the elementwise-product of 2 matrices in pyspark,
e.g. RowMatrix, DistributedMatrix?
I cannot find a good answer/API entry?
Thanks for all the help.


Best,
Simon

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



(send this email to subscribe)

2019-02-08 Thread Andre Carneiro
-- 
André Garcia Carneiro
Software Engineer
(11)982907780


Re: Spark 2.4 partitions and tasks

2019-02-08 Thread Pedro Tuero
I did a repartition to 1 (hardcoded) before the keyBy and it ends in
1.2 minutes.
The questions remain open, because I don't want to harcode paralellism.

El vie., 8 de feb. de 2019 a la(s) 12:50, Pedro Tuero (tuerope...@gmail.com)
escribió:

> 128 is the default parallelism defined for the cluster.
> The question now is why keyBy operation is using default parallelism
> instead of the number of partition of the RDD created by the previous step
> (5580).
> Any clues?
>
> El jue., 7 de feb. de 2019 a la(s) 15:30, Pedro Tuero (
> tuerope...@gmail.com) escribió:
>
>> Hi,
>> I am running a job in spark (using aws emr) and some stages are taking a
>> lot more using spark  2.4 instead of Spark 2.3.1:
>>
>> Spark 2.4:
>> [image: image.png]
>>
>> Spark 2.3.1:
>> [image: image.png]
>>
>> With Spark 2.4, the keyBy operation take more than 10X what it took with
>> Spark 2.3.1
>> It seems to be related to the number of tasks / partitions.
>>
>> Questions:
>> - Is it not supposed that the number of task of a job is related to
>> number of parts of the RDD left by the previous job? Did that change in
>> version 2.4??
>> - Which tools/ configuration may I try, to reduce this aberrant downgrade
>> of performance??
>>
>> Thanks.
>> Pedro.
>>
>


Re: Spark 2.4 partitions and tasks

2019-02-08 Thread Pedro Tuero
128 is the default parallelism defined for the cluster.
The question now is why keyBy operation is using default parallelism
instead of the number of partition of the RDD created by the previous step
(5580).
Any clues?

El jue., 7 de feb. de 2019 a la(s) 15:30, Pedro Tuero (tuerope...@gmail.com)
escribió:

> Hi,
> I am running a job in spark (using aws emr) and some stages are taking a
> lot more using spark  2.4 instead of Spark 2.3.1:
>
> Spark 2.4:
> [image: image.png]
>
> Spark 2.3.1:
> [image: image.png]
>
> With Spark 2.4, the keyBy operation take more than 10X what it took with
> Spark 2.3.1
> It seems to be related to the number of tasks / partitions.
>
> Questions:
> - Is it not supposed that the number of task of a job is related to number
> of parts of the RDD left by the previous job? Did that change in version
> 2.4??
> - Which tools/ configuration may I try, to reduce this aberrant downgrade
> of performance??
>
> Thanks.
> Pedro.
>


Re: Aws

2019-02-08 Thread Pedro Tuero
Hi Noritaka,

I start clusters from Java API.
Clusters running on 5.16 have not manual configurations in the Emr console
Configuration tab, so I assume the value of this property should be the
default on 5.16.
I enabled maximize resource allocation because otherwise, the number of
cores automatically assigned (without assigning spark.executor.cores
manually) was always one per executor.

I already use the same configurations. I used the same scripts and
configuration files for running the same job with same input data with the
same configuration, only changing the binaries with my own code which
include launching the clusters using emr 5.20 release label.

Anyway, setting maximize resource allocation seems to have helped with the
cores distribution enough.
Some jobs take even less than before.
Now I'm stuck analyzing a case where the number of tasks created seems to
be the problem. I have posted in this forum another thread about that
recently.

Regards,
Pedro


El jue., 7 de feb. de 2019 a la(s) 21:37, Noritaka Sekiyama (
moomind...@gmail.com) escribió:

> Hi Pedro,
>
> It seems that you disabled maximize resource allocation in 5.16, but
> enabled in 5.20.
> This config can be different based on how you start EMR cluster (via quick
> wizard, advanced wizard in console, or CLI/API).
> You can see that in EMR console Configuration tab.
>
> Please compare spark properties (especially spark.executor.cores,
> spark.executor.memory, spark.dynamicAllocation.enabled, etc.)  between
> your two Spark cluster with different version of EMR.
> You can see them from Spark web UI’s environment tab or log files.
> Then please try with the same properties against the same dataset with the
> same deployment mode (cluster or client).
>
> Even in EMR, you can configure num of cores and memory of driver/executors
> in config files, arguments in spark-submit, and inside Spark app if you
> need.
>
>
> Warm regards,
> Nori
>
> 2019年2月8日(金) 8:16 Hiroyuki Nagata :
>
>> Hi,
>> thank you Pedro
>>
>> I tested maximizeResourceAllocation option. When it's enabled, it seems
>> Spark utilized their cores fully. However the performance is not so
>> different from default setting.
>>
>> I consider to use s3-distcp for uploading files. And, I think
>> table(dataframe) caching is also effectiveness.
>>
>> Regards,
>> Hiroyuki
>>
>> 2019年2月2日(土) 1:12 Pedro Tuero :
>>
>>> Hi Hiroyuki, thanks for the answer.
>>>
>>> I found a solution for the cores per executor configuration:
>>> I set this configuration to true:
>>>
>>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html#emr-spark-maximizeresourceallocation
>>> Probably it was true by default at version 5.16, but I didn't find when
>>> it has changed.
>>> In the same link, it says that dynamic allocation is true by default. I
>>> thought it would do the trick but reading again I think it is related to
>>> the number of executors rather than the number of cores.
>>>
>>> But the jobs are still taking more than before.
>>> Watching application history,  I see these differences:
>>> For the same job, the same kind of instances types, default (aws
>>> managed) configuration for executors, cores, and memory:
>>> Instances:
>>> 6 r5.xlarge :  4 vCpu , 32gb of mem. (So there is 24 cores: 6 instances
>>> * 4 cores).
>>>
>>> With 5.16:
>>> - 24 executors  (4 in each instance, including the one who also had the
>>> driver).
>>> - 4 cores each.
>>> - 2.7  * 2 (Storage + on-heap storage) memory each.
>>> - 1 executor per core, but at the same time  4 cores per executor (?).
>>> - Total Mem in executors per Instance : 21.6 (2.7 * 2 * 4)
>>> - Total Elapsed Time: 6 minutes
>>> With 5.20:
>>> - 5 executors (1 in each instance, 0 in the instance with the driver).
>>> - 4 cores each.
>>> - 11.9  * 2 (Storage + on-heap storage) memory each.
>>> - Total Mem  in executors per Instance : 23.8 (11.9 * 2 * 1)
>>> - Total Elapsed Time: 8 minutes
>>>
>>>
>>> I don't understand the configuration of 5.16, but it works better.
>>> It seems that in 5.20, a full instance is wasted with the driver only,
>>> while it could also contain an executor.
>>>
>>>
>>> Regards,
>>> Pedro.
>>>
>>>
>>>
>>> l jue., 31 de ene. de 2019 20:16, Hiroyuki Nagata 
>>> escribió:
>>>
 Hi, Pedro


 I also start using AWS EMR, with Spark 2.4.0. I'm seeking methods for
 performance tuning.

 Do you configure dynamic allocation ?

 FYI:

 https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation

 I've not tested it yet. I guess spark-submit needs to specify number of
 executors.

 Regards,
 Hiroyuki

 2019年2月1日(金) 5:23、Pedro Tuero さん(tuerope...@gmail.com)のメッセージ:

> Hi guys,
> I use to run spark jobs in Aws emr.
> Recently I switch from aws emr label  5.16 to 5.20 (which use Spark
> 2.4.0).
> I've noticed that a lot of steps are taking longer than before.
> I think it is related to the automatic 

Spark 2.4 Regression with posexplode and structs

2019-02-08 Thread Andreas Weise
Hi,

after upgrading from 2.3.2 to 2.4.0 we recognized a regression when using
posexplode() in conjunction with select of another struct fields.

Given a structure like this:
=
>>> df = (spark.range(1)
... .withColumn("my_arr", array(lit("1"), lit("2")))
... .withColumn("bar", lit("1"))
... .select("id", "my_arr", struct("bar").alias("foo"))
... )
>>>
>>> df.printSchema()
root
 |-- id: long (nullable = false)
 |-- my_arr: array (nullable = false)
 ||-- element: string (containsNull = false)
 |-- foo: struct (nullable = false)
 ||-- bar: string (nullable = false)



Spark 2.3.2
===
>>>
>>> df = df.select(posexplode("my_arr"), "foo.bar")
>>>
>>> df.printSchema()
root
 |-- pos: integer (nullable = false)
 |-- col: string (nullable = false)
 |-- bar: string (nullable = false)


selecting "foo.bar" results in field "bar".


Spark 2.4.0
===
>>>
>>> df = df.select(posexplode("my_arr"), "foo.bar")
>>>
>>> df.printSchema()
root
 |-- pos: integer (nullable = false)
 |-- col: string (nullable = false)
 |-- foo.bar: string (nullable = false)


In 2.4 'bar' now gets 'foo.bar', which is not what we would expect.

So existing code having .select("bar") will fail.

>>> df.select("bar").show()
Traceback (most recent call last):
  File "", line 1, in 
  File
"/home/andreas/Downloads/spark-2.4.0-bin-hadoop2.7/python/pyspark/sql/dataframe.py",
line 1320, in select
jdf = self._jdf.select(self._jcols(*cols))
  File
"/home/andreas/Downloads/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
line 1257, in __call__
  File
"/home/andreas/Downloads/spark-2.4.0-bin-hadoop2.7/python/pyspark/sql/utils.py",
line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"cannot resolve '`bar`' given input
columns: [pos, col, foo.bar];;\n'Project ['bar]\n+- Project [pos#14,
col#15, foo#9.bar AS foo.bar#16]\n   +- Generate posexplode(my_arr#2),
false, [pos#14, col#15]\n  +- Project [id#0L, my_arr#2,
named_struct(bar, bar#5) AS foo#9]\n +- Project [id#0L, my_arr#2, 1
AS bar#5]\n+- Project [id#0L, array(1, 2) AS my_arr#2]\n
   +- Range (0, 1, step=1, splits=Some(4))\n"


Is this a known issue / intended behavior?

Regards
Andreas