Multiple column aggregations
Hello, I have a requirement where I need to group by multiple columns and aggregate them not at same time .. I mean I have a structure which contains accountid, some cols, order id . I need to calculate some scenarios like account having multiple orders so group by account and aggregate will work here but I need to find orderid associated to multiple accounts so may be group by orderid will work here but for better performance on the dataset level can we do in single step? Where both will work or any better approach I can follow . Can you help Regards, Sonu
Pyspark elementwise matrix multiplication
Dear all, I wonder if there is a way to take the elementwise-product of 2 matrices (RowMatrix, DistributedMatrix, ..) in pyspark? I cannot find a good answer/API entry on the topic. Thank you for all the help. Best, Simon
Element-wise multiplication in Pyspark
Dear all, is there a way to take the elementwise-product of 2 matrices in pyspark, e.g. RowMatrix, DistributedMatrix? I cannot find a good answer/API entry? Thanks for all the help. Best, Simon - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
(send this email to subscribe)
-- André Garcia Carneiro Software Engineer (11)982907780
Re: Spark 2.4 partitions and tasks
I did a repartition to 1 (hardcoded) before the keyBy and it ends in 1.2 minutes. The questions remain open, because I don't want to harcode paralellism. El vie., 8 de feb. de 2019 a la(s) 12:50, Pedro Tuero (tuerope...@gmail.com) escribió: > 128 is the default parallelism defined for the cluster. > The question now is why keyBy operation is using default parallelism > instead of the number of partition of the RDD created by the previous step > (5580). > Any clues? > > El jue., 7 de feb. de 2019 a la(s) 15:30, Pedro Tuero ( > tuerope...@gmail.com) escribió: > >> Hi, >> I am running a job in spark (using aws emr) and some stages are taking a >> lot more using spark 2.4 instead of Spark 2.3.1: >> >> Spark 2.4: >> [image: image.png] >> >> Spark 2.3.1: >> [image: image.png] >> >> With Spark 2.4, the keyBy operation take more than 10X what it took with >> Spark 2.3.1 >> It seems to be related to the number of tasks / partitions. >> >> Questions: >> - Is it not supposed that the number of task of a job is related to >> number of parts of the RDD left by the previous job? Did that change in >> version 2.4?? >> - Which tools/ configuration may I try, to reduce this aberrant downgrade >> of performance?? >> >> Thanks. >> Pedro. >> >
Re: Spark 2.4 partitions and tasks
128 is the default parallelism defined for the cluster. The question now is why keyBy operation is using default parallelism instead of the number of partition of the RDD created by the previous step (5580). Any clues? El jue., 7 de feb. de 2019 a la(s) 15:30, Pedro Tuero (tuerope...@gmail.com) escribió: > Hi, > I am running a job in spark (using aws emr) and some stages are taking a > lot more using spark 2.4 instead of Spark 2.3.1: > > Spark 2.4: > [image: image.png] > > Spark 2.3.1: > [image: image.png] > > With Spark 2.4, the keyBy operation take more than 10X what it took with > Spark 2.3.1 > It seems to be related to the number of tasks / partitions. > > Questions: > - Is it not supposed that the number of task of a job is related to number > of parts of the RDD left by the previous job? Did that change in version > 2.4?? > - Which tools/ configuration may I try, to reduce this aberrant downgrade > of performance?? > > Thanks. > Pedro. >
Re: Aws
Hi Noritaka, I start clusters from Java API. Clusters running on 5.16 have not manual configurations in the Emr console Configuration tab, so I assume the value of this property should be the default on 5.16. I enabled maximize resource allocation because otherwise, the number of cores automatically assigned (without assigning spark.executor.cores manually) was always one per executor. I already use the same configurations. I used the same scripts and configuration files for running the same job with same input data with the same configuration, only changing the binaries with my own code which include launching the clusters using emr 5.20 release label. Anyway, setting maximize resource allocation seems to have helped with the cores distribution enough. Some jobs take even less than before. Now I'm stuck analyzing a case where the number of tasks created seems to be the problem. I have posted in this forum another thread about that recently. Regards, Pedro El jue., 7 de feb. de 2019 a la(s) 21:37, Noritaka Sekiyama ( moomind...@gmail.com) escribió: > Hi Pedro, > > It seems that you disabled maximize resource allocation in 5.16, but > enabled in 5.20. > This config can be different based on how you start EMR cluster (via quick > wizard, advanced wizard in console, or CLI/API). > You can see that in EMR console Configuration tab. > > Please compare spark properties (especially spark.executor.cores, > spark.executor.memory, spark.dynamicAllocation.enabled, etc.) between > your two Spark cluster with different version of EMR. > You can see them from Spark web UI’s environment tab or log files. > Then please try with the same properties against the same dataset with the > same deployment mode (cluster or client). > > Even in EMR, you can configure num of cores and memory of driver/executors > in config files, arguments in spark-submit, and inside Spark app if you > need. > > > Warm regards, > Nori > > 2019年2月8日(金) 8:16 Hiroyuki Nagata : > >> Hi, >> thank you Pedro >> >> I tested maximizeResourceAllocation option. When it's enabled, it seems >> Spark utilized their cores fully. However the performance is not so >> different from default setting. >> >> I consider to use s3-distcp for uploading files. And, I think >> table(dataframe) caching is also effectiveness. >> >> Regards, >> Hiroyuki >> >> 2019年2月2日(土) 1:12 Pedro Tuero : >> >>> Hi Hiroyuki, thanks for the answer. >>> >>> I found a solution for the cores per executor configuration: >>> I set this configuration to true: >>> >>> https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html#emr-spark-maximizeresourceallocation >>> Probably it was true by default at version 5.16, but I didn't find when >>> it has changed. >>> In the same link, it says that dynamic allocation is true by default. I >>> thought it would do the trick but reading again I think it is related to >>> the number of executors rather than the number of cores. >>> >>> But the jobs are still taking more than before. >>> Watching application history, I see these differences: >>> For the same job, the same kind of instances types, default (aws >>> managed) configuration for executors, cores, and memory: >>> Instances: >>> 6 r5.xlarge : 4 vCpu , 32gb of mem. (So there is 24 cores: 6 instances >>> * 4 cores). >>> >>> With 5.16: >>> - 24 executors (4 in each instance, including the one who also had the >>> driver). >>> - 4 cores each. >>> - 2.7 * 2 (Storage + on-heap storage) memory each. >>> - 1 executor per core, but at the same time 4 cores per executor (?). >>> - Total Mem in executors per Instance : 21.6 (2.7 * 2 * 4) >>> - Total Elapsed Time: 6 minutes >>> With 5.20: >>> - 5 executors (1 in each instance, 0 in the instance with the driver). >>> - 4 cores each. >>> - 11.9 * 2 (Storage + on-heap storage) memory each. >>> - Total Mem in executors per Instance : 23.8 (11.9 * 2 * 1) >>> - Total Elapsed Time: 8 minutes >>> >>> >>> I don't understand the configuration of 5.16, but it works better. >>> It seems that in 5.20, a full instance is wasted with the driver only, >>> while it could also contain an executor. >>> >>> >>> Regards, >>> Pedro. >>> >>> >>> >>> l jue., 31 de ene. de 2019 20:16, Hiroyuki Nagata >>> escribió: >>> Hi, Pedro I also start using AWS EMR, with Spark 2.4.0. I'm seeking methods for performance tuning. Do you configure dynamic allocation ? FYI: https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation I've not tested it yet. I guess spark-submit needs to specify number of executors. Regards, Hiroyuki 2019年2月1日(金) 5:23、Pedro Tuero さん(tuerope...@gmail.com)のメッセージ: > Hi guys, > I use to run spark jobs in Aws emr. > Recently I switch from aws emr label 5.16 to 5.20 (which use Spark > 2.4.0). > I've noticed that a lot of steps are taking longer than before. > I think it is related to the automatic
Spark 2.4 Regression with posexplode and structs
Hi, after upgrading from 2.3.2 to 2.4.0 we recognized a regression when using posexplode() in conjunction with select of another struct fields. Given a structure like this: = >>> df = (spark.range(1) ... .withColumn("my_arr", array(lit("1"), lit("2"))) ... .withColumn("bar", lit("1")) ... .select("id", "my_arr", struct("bar").alias("foo")) ... ) >>> >>> df.printSchema() root |-- id: long (nullable = false) |-- my_arr: array (nullable = false) ||-- element: string (containsNull = false) |-- foo: struct (nullable = false) ||-- bar: string (nullable = false) Spark 2.3.2 === >>> >>> df = df.select(posexplode("my_arr"), "foo.bar") >>> >>> df.printSchema() root |-- pos: integer (nullable = false) |-- col: string (nullable = false) |-- bar: string (nullable = false) selecting "foo.bar" results in field "bar". Spark 2.4.0 === >>> >>> df = df.select(posexplode("my_arr"), "foo.bar") >>> >>> df.printSchema() root |-- pos: integer (nullable = false) |-- col: string (nullable = false) |-- foo.bar: string (nullable = false) In 2.4 'bar' now gets 'foo.bar', which is not what we would expect. So existing code having .select("bar") will fail. >>> df.select("bar").show() Traceback (most recent call last): File "", line 1, in File "/home/andreas/Downloads/spark-2.4.0-bin-hadoop2.7/python/pyspark/sql/dataframe.py", line 1320, in select jdf = self._jdf.select(self._jcols(*cols)) File "/home/andreas/Downloads/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ File "/home/andreas/Downloads/spark-2.4.0-bin-hadoop2.7/python/pyspark/sql/utils.py", line 69, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: u"cannot resolve '`bar`' given input columns: [pos, col, foo.bar];;\n'Project ['bar]\n+- Project [pos#14, col#15, foo#9.bar AS foo.bar#16]\n +- Generate posexplode(my_arr#2), false, [pos#14, col#15]\n +- Project [id#0L, my_arr#2, named_struct(bar, bar#5) AS foo#9]\n +- Project [id#0L, my_arr#2, 1 AS bar#5]\n+- Project [id#0L, array(1, 2) AS my_arr#2]\n +- Range (0, 1, step=1, splits=Some(4))\n" Is this a known issue / intended behavior? Regards Andreas