I think the slowness is caused by generated aggregate method has more than 8K bytecodes, than it's not JIT compiled, became much slower.
Could you try to disable the DontCompileHugeMethods by: -XX:-DontCompileHugeMethods On Mon, Sep 5, 2016 at 4:21 AM, Сергей Романов <romano...@inbox.ru.invalid> wrote: > Hi, Gavin, > > Shuffling is exactly the same in both requests and is minimal. Both requests > produces one shuffle task. Running time is the only difference I can see in > metrics: > > timeit.timeit(spark.read.csv('file:///data/dump/test_csv', > schema=schema).groupBy().sum(*(['dd_convs'] * 57) ).collect, number=1) > 0.713730096817 > { > "id" : 368, > "name" : "duration total (min, med, max)", > "value" : "524" > }, { > "id" : 375, > "name" : "internal.metrics.executorRunTime", > "value" : "527" > }, { > "id" : 391, > "name" : "internal.metrics.shuffle.write.writeTime", > "value" : "244495" > } > > timeit.timeit(spark.read.csv('file:///data/dump/test_csv', > schema=schema).groupBy().sum(*(['dd_convs'] * 58) ).collect, number=1) > 2.97951102257 > > }, { > "id" : 469, > "name" : "duration total (min, med, max)", > "value" : "2654" > }, { > "id" : 476, > "name" : "internal.metrics.executorRunTime", > "value" : "2661" > }, { > "id" : 492, > "name" : "internal.metrics.shuffle.write.writeTime", > "value" : "371883" > }, { > > Full metrics in attachment. > > Суббота, 3 сентября 2016, 19:53 +03:00 от Gavin Yue > <yue.yuany...@gmail.com>: > > > Any shuffling? > > > On Sep 3, 2016, at 5:50 AM, Сергей Романов <romano...@inbox.ru.INVALID> > wrote: > > Same problem happens with CSV data file, so it's not parquet-related either. > > Welcome to > ____ __ > / __/__ ___ _____/ /__ > _\ \/ _ \/ _ `/ __/ '_/ > /__ / .__/\_,_/_/ /_/\_\ version 2.0.0 > /_/ > > Using Python version 2.7.6 (default, Jun 22 2015 17:58:13) > SparkSession available as 'spark'. >>>> import timeit >>>> from pyspark.sql.types import * >>>> schema = StructType([StructField('dd_convs', FloatType(), True)]) >>>> for x in range(50, 70): print x, >>>> timeit.timeit(spark.read.csv('file:///data/dump/test_csv', >>>> schema=schema).groupBy().sum(*(['dd_convs'] * x) ).collect, number=1) > 50 0.372850894928 > 51 0.376906871796 > 52 0.381325960159 > 53 0.385444164276 > 54 0.386877775192 > 55 0.388918161392 > 56 0.397624969482 > 57 0.391713142395 > 58 2.62714004517 > 59 2.68421196938 > 60 2.74627685547 > 61 2.81081581116 > 62 3.43532109261 > 63 3.07742786407 > 64 3.03904604912 > 65 3.01616096497 > 66 3.06293702126 > 67 3.09386610985 > 68 3.27610206604 > 69 3.2041969299 > > Суббота, 3 сентября 2016, 15:40 +03:00 от Сергей Романов > <romano...@inbox.ru.INVALID>: > > Hi, > > I had narrowed down my problem to a very simple case. I'm sending 27kb > parquet in attachment. (file:///data/dump/test2 in example) > > Please, can you take a look at it? Why there is performance drop after 57 > sum columns? > > Welcome to > ____ __ > / __/__ ___ _____/ /__ > _\ \/ _ \/ _ `/ __/ '_/ > /__ / .__/\_,_/_/ /_/\_\ version 2.0.0 > /_/ > > Using Python version 2.7.6 (default, Jun 22 2015 17:58:13) > SparkSession available as 'spark'. >>>> import timeit >>>> for x in range(70): print x, >>>> timeit.timeit(spark.read.parquet('file:///data/dump/test2').groupBy().sum(*(['dd_convs'] >>>> * x) ).collect, number=1) > ... > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > SLF4J: Defaulting to no-operation (NOP) logger implementation > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further > details. > 0 1.05591607094 > 1 0.200426101685 > 2 0.203800916672 > 3 0.176458120346 > 4 0.184863805771 > 5 0.232321023941 > 6 0.216032981873 > 7 0.201778173447 > 8 0.292424917221 > 9 0.228524923325 > 10 0.190534114838 > 11 0.197028160095 > 12 0.270443916321 > 13 0.429781913757 > 14 0.270851135254 > 15 0.776989936829 > 16 0.233337879181 > 17 0.227638959885 > 18 0.212944030762 > 19 0.2144780159 > 20 0.22200012207 > 21 0.262261152267 > 22 0.254227876663 > 23 0.275084018707 > 24 0.292124032974 > 25 0.280488014221 > 16/09/03 15:31:28 WARN Utils: Truncated the string representation of a plan > since it was too large. This behavior can be adjusted by setting > 'spark.debug.maxToStringFields' in SparkEnv.conf. > 26 0.290093898773 > 27 0.238478899002 > 28 0.246420860291 > 29 0.241401195526 > 30 0.255286931992 > 31 0.42702794075 > 32 0.327946186066 > 33 0.434395074844 > 34 0.314198970795 > 35 0.34576010704 > 36 0.278323888779 > 37 0.289474964142 > 38 0.290827989578 > 39 0.376291036606 > 40 0.347742080688 > 41 0.363158941269 > 42 0.318687915802 > 43 0.376327991486 > 44 0.374994039536 > 45 0.362971067429 > 46 0.425967931747 > 47 0.370860099792 > 48 0.443903923035 > 49 0.374128103256 > 50 0.378985881805 > 51 0.476850986481 > 52 0.451028823853 > 53 0.432540893555 > 54 0.514838933945 > 55 0.53990483284 > 56 0.449142932892 > 57 0.465240001678 // 5x slower after 57 columns > 58 2.40412116051 > 59 2.41632795334 > 60 2.41812801361 > 61 2.55726218224 > 62 2.55484509468 > 63 2.56128406525 > 64 2.54642391205 > 65 2.56381797791 > 66 2.56871509552 > 67 2.66187620163 > 68 2.63496208191 > 69 2.81545996666 > > > Sergei Romanov > > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > Sergei Romanov > > <bad.csv.tgz> > > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org