[ https://issues.apache.org/jira/browse/SPARK-16320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15404369#comment-15404369 ]
Yin Huai commented on SPARK-16320: ---------------------------------- Can you also try https://github.com/apache/spark/pull/13701 and see if it makes any difference? > Spark 2.0 slower than 1.6 when querying nested columns > ------------------------------------------------------ > > Key: SPARK-16320 > URL: https://issues.apache.org/jira/browse/SPARK-16320 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.0.0 > Reporter: Maciej BryĆski > Priority: Critical > Attachments: spark1.6-ui.png, spark2-ui.png > > > I did some test on parquet file with many nested columns (about 30G in > 400 partitions) and Spark 2.0 is sometimes slower. > I tested following queries: > 1) {code}select count(*) where id > some_id{code} > In this query performance is similar. (about 1 sec) > 2) {code}select count(*) where nested_column.id > some_id{code} > Spark 1.6 -> 1.6 min > Spark 2.0 -> 2.1 min > Should I expect such a drop in performance ? > I don't know how to prepare sample data to show the problem. > Any ideas ? Or public data with many nested columns ? > *UPDATE* > I created script to generate data and to confirm this problem. > {code} > #Initialization > from pyspark import SparkContext, SparkConf > from pyspark.sql import HiveContext > from pyspark.sql.functions import struct > conf = SparkConf() > conf.set('spark.cores.max', 15) > conf.set('spark.executor.memory', '30g') > conf.set('spark.driver.memory', '30g') > sc = SparkContext(conf=conf) > sqlctx = HiveContext(sc) > #Data creation > MAX_SIZE = 2**32 - 1 > path = '/mnt/mfs/parquet_nested' > def create_sample_data(levels, rows, path): > > def _create_column_data(cols): > import random > random.seed() > return {"column{}".format(i): random.randint(0, MAX_SIZE) for i in > range(cols)} > > def _create_sample_df(cols, rows): > rdd = sc.parallelize(range(rows)) > data = rdd.map(lambda r: _create_column_data(cols)) > df = sqlctx.createDataFrame(data) > return df > > def _create_nested_data(levels, rows): > if len(levels) == 1: > return _create_sample_df(levels[0], rows).cache() > else: > df = _create_nested_data(levels[1:], rows) > return df.select([struct(df.columns).alias("column{}".format(i)) > for i in range(levels[0])]) > df = _create_nested_data(levels, rows) > df.write.mode('overwrite').parquet(path) > > #Sample data > create_sample_data([2,10,200], 1000000, path) > #Query > df = sqlctx.read.parquet(path) > %%timeit > df.where("column1.column5.column50 > {}".format(int(MAX_SIZE / 2))).count() > {code} > Results > Spark 1.6 > 1 loop, best of 3: *1min 5s* per loop > Spark 2.0 > 1 loop, best of 3: *1min 21s* per loop > *UPDATE 2* > Analysis in https://issues.apache.org/jira/browse/SPARK-16321 direct to same > source. > I attached some VisualVM profiles there. > Most interesting are from queries. > https://issues.apache.org/jira/secure/attachment/12818785/spark16_query.nps > https://issues.apache.org/jira/secure/attachment/12818784/spark2_query.nps -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org