Re: Best way to deal with skewed partition sizes
Hi, In the latest release of SPARK I have seen significant improvements in case your data is in parquet format, which I see it is. But since you are not using spark session and using older API's of spark with spark sqlContext therefore there is a high chance that you are not using the spark improvements at all. Is there any particular reason why you would not prefer using spark session? Regards, Gourav On Wed, Mar 22, 2017 at 8:30 PM, Matt Deaverwrote: > For various reasons, our data set is partitioned in Spark by customer id > and saved to S3. When trying to read this data, however, the larger > partitions make it difficult to parallelize jobs. For example, out of a > couple thousand companies, some have <10 MB data while some have >10GB. > This is the code I'm using in a Zeppelin notebook and it takes a very long > time to read in (2+ hours on a ~200 GB dataset from S3): > > df1 = sqlContext.read.parquet("s3a://[bucket1]/[prefix1]/") > df2 = sqlContext.read.parquet("s3a://[bucket2]/[prefix2]/") > > # generate a bunch of derived columns here for df1 > df1 = df1.withColumn('derivedcol1', df1.source_col) > > > # limit output columns for later union > df1 = df1.select( > [limited set of columns] > ) > > # generate a bunch of derived columns here for df2 > df2 = df2.withColumn('derivedcol1', df2.source_col) > > # limit output columns for later union > df2 = df2.select( > [limited set of columns] > ) > > print(df1.rdd.getNumPartitions()) > print(df2.rdd.getNumPartitions()) > > merge_df = df1.unionAll(df2) > merge_df.repartition(300) > > merge_df.registerTempTable("union_table") > sqlContext.cacheTable("union_table") > sqlContext.sql("select count(*) from union_table").collect() > > Any suggestions on making this faster? >
Re: Best way to deal with skewed partition sizes
And on another note, is there any particular reason for you using s3a:// instead of s3://? Regards, Gourav On Wed, Mar 22, 2017 at 8:30 PM, Matt Deaverwrote: > For various reasons, our data set is partitioned in Spark by customer id > and saved to S3. When trying to read this data, however, the larger > partitions make it difficult to parallelize jobs. For example, out of a > couple thousand companies, some have <10 MB data while some have >10GB. > This is the code I'm using in a Zeppelin notebook and it takes a very long > time to read in (2+ hours on a ~200 GB dataset from S3): > > df1 = sqlContext.read.parquet("s3a://[bucket1]/[prefix1]/") > df2 = sqlContext.read.parquet("s3a://[bucket2]/[prefix2]/") > > # generate a bunch of derived columns here for df1 > df1 = df1.withColumn('derivedcol1', df1.source_col) > > > # limit output columns for later union > df1 = df1.select( > [limited set of columns] > ) > > # generate a bunch of derived columns here for df2 > df2 = df2.withColumn('derivedcol1', df2.source_col) > > # limit output columns for later union > df2 = df2.select( > [limited set of columns] > ) > > print(df1.rdd.getNumPartitions()) > print(df2.rdd.getNumPartitions()) > > merge_df = df1.unionAll(df2) > merge_df.repartition(300) > > merge_df.registerTempTable("union_table") > sqlContext.cacheTable("union_table") > sqlContext.sql("select count(*) from union_table").collect() > > Any suggestions on making this faster? >
Re: Best way to deal with skewed partition sizes
could you give the event timeline and dag for the time consuming stages on spark UI? On Thu, Mar 23, 2017 at 4:30 AM, Matt Deaverwrote: > For various reasons, our data set is partitioned in Spark by customer id > and saved to S3. When trying to read this data, however, the larger > partitions make it difficult to parallelize jobs. For example, out of a > couple thousand companies, some have <10 MB data while some have >10GB. > This is the code I'm using in a Zeppelin notebook and it takes a very long > time to read in (2+ hours on a ~200 GB dataset from S3): > > df1 = sqlContext.read.parquet("s3a://[bucket1]/[prefix1]/") > df2 = sqlContext.read.parquet("s3a://[bucket2]/[prefix2]/") > > # generate a bunch of derived columns here for df1 > df1 = df1.withColumn('derivedcol1', df1.source_col) > > > # limit output columns for later union > df1 = df1.select( > [limited set of columns] > ) > > # generate a bunch of derived columns here for df2 > df2 = df2.withColumn('derivedcol1', df2.source_col) > > # limit output columns for later union > df2 = df2.select( > [limited set of columns] > ) > > print(df1.rdd.getNumPartitions()) > print(df2.rdd.getNumPartitions()) > > merge_df = df1.unionAll(df2) > merge_df.repartition(300) > > merge_df.registerTempTable("union_table") > sqlContext.cacheTable("union_table") > sqlContext.sql("select count(*) from union_table").collect() > > Any suggestions on making this faster? >
Best way to deal with skewed partition sizes
For various reasons, our data set is partitioned in Spark by customer id and saved to S3. When trying to read this data, however, the larger partitions make it difficult to parallelize jobs. For example, out of a couple thousand companies, some have <10 MB data while some have >10GB. This is the code I'm using in a Zeppelin notebook and it takes a very long time to read in (2+ hours on a ~200 GB dataset from S3): df1 = sqlContext.read.parquet("s3a://[bucket1]/[prefix1]/") df2 = sqlContext.read.parquet("s3a://[bucket2]/[prefix2]/") # generate a bunch of derived columns here for df1 df1 = df1.withColumn('derivedcol1', df1.source_col) # limit output columns for later union df1 = df1.select( [limited set of columns] ) # generate a bunch of derived columns here for df2 df2 = df2.withColumn('derivedcol1', df2.source_col) # limit output columns for later union df2 = df2.select( [limited set of columns] ) print(df1.rdd.getNumPartitions()) print(df2.rdd.getNumPartitions()) merge_df = df1.unionAll(df2) merge_df.repartition(300) merge_df.registerTempTable("union_table") sqlContext.cacheTable("union_table") sqlContext.sql("select count(*) from union_table").collect() Any suggestions on making this faster?