Re: Best way to deal with skewed partition sizes

2017-03-23 Thread Gourav Sengupta
Hi,

In the latest release of SPARK I have seen significant improvements in case
your data is in parquet format, which I see it is.

But since you are not using spark session and using older API's of spark
with spark sqlContext therefore there is a high chance that you are not
using the spark improvements at all.

Is there any particular reason why you would not prefer using spark session?


Regards,
Gourav

On Wed, Mar 22, 2017 at 8:30 PM, Matt Deaver  wrote:

> For various reasons, our data set is partitioned in Spark by customer id
> and saved to S3. When trying to read this data, however, the larger
> partitions make it difficult to parallelize jobs. For example, out of a
> couple thousand companies, some have <10 MB data while some have >10GB.
> This is the code I'm using in a Zeppelin notebook and it takes a very long
> time to read in (2+ hours on a ~200 GB dataset from S3):
>
> df1 = sqlContext.read.parquet("s3a://[bucket1]/[prefix1]/")
> df2 = sqlContext.read.parquet("s3a://[bucket2]/[prefix2]/")
>
> # generate a bunch of derived columns here for df1
> df1 = df1.withColumn('derivedcol1', df1.source_col)
>
>
> # limit output columns for later union
> df1 = df1.select(
> [limited set of columns]
> )
>
> # generate a bunch of derived columns here for df2
> df2 = df2.withColumn('derivedcol1', df2.source_col)
>
> # limit output columns for later union
> df2 = df2.select(
> [limited set of columns]
> )
>
> print(df1.rdd.getNumPartitions())
> print(df2.rdd.getNumPartitions())
>
> merge_df = df1.unionAll(df2)
> merge_df.repartition(300)
>
> merge_df.registerTempTable("union_table")
> sqlContext.cacheTable("union_table")
> sqlContext.sql("select count(*) from union_table").collect()
>
> Any suggestions on making this faster?
>


Re: Best way to deal with skewed partition sizes

2017-03-23 Thread Gourav Sengupta
And on another note, is there any particular reason for you using s3a://
 instead of s3://?


Regards,
Gourav

On Wed, Mar 22, 2017 at 8:30 PM, Matt Deaver  wrote:

> For various reasons, our data set is partitioned in Spark by customer id
> and saved to S3. When trying to read this data, however, the larger
> partitions make it difficult to parallelize jobs. For example, out of a
> couple thousand companies, some have <10 MB data while some have >10GB.
> This is the code I'm using in a Zeppelin notebook and it takes a very long
> time to read in (2+ hours on a ~200 GB dataset from S3):
>
> df1 = sqlContext.read.parquet("s3a://[bucket1]/[prefix1]/")
> df2 = sqlContext.read.parquet("s3a://[bucket2]/[prefix2]/")
>
> # generate a bunch of derived columns here for df1
> df1 = df1.withColumn('derivedcol1', df1.source_col)
>
>
> # limit output columns for later union
> df1 = df1.select(
> [limited set of columns]
> )
>
> # generate a bunch of derived columns here for df2
> df2 = df2.withColumn('derivedcol1', df2.source_col)
>
> # limit output columns for later union
> df2 = df2.select(
> [limited set of columns]
> )
>
> print(df1.rdd.getNumPartitions())
> print(df2.rdd.getNumPartitions())
>
> merge_df = df1.unionAll(df2)
> merge_df.repartition(300)
>
> merge_df.registerTempTable("union_table")
> sqlContext.cacheTable("union_table")
> sqlContext.sql("select count(*) from union_table").collect()
>
> Any suggestions on making this faster?
>


Re: Best way to deal with skewed partition sizes

2017-03-22 Thread Ryan
could you give the event timeline and dag for the time consuming stages on
spark UI?

On Thu, Mar 23, 2017 at 4:30 AM, Matt Deaver  wrote:

> For various reasons, our data set is partitioned in Spark by customer id
> and saved to S3. When trying to read this data, however, the larger
> partitions make it difficult to parallelize jobs. For example, out of a
> couple thousand companies, some have <10 MB data while some have >10GB.
> This is the code I'm using in a Zeppelin notebook and it takes a very long
> time to read in (2+ hours on a ~200 GB dataset from S3):
>
> df1 = sqlContext.read.parquet("s3a://[bucket1]/[prefix1]/")
> df2 = sqlContext.read.parquet("s3a://[bucket2]/[prefix2]/")
>
> # generate a bunch of derived columns here for df1
> df1 = df1.withColumn('derivedcol1', df1.source_col)
>
>
> # limit output columns for later union
> df1 = df1.select(
> [limited set of columns]
> )
>
> # generate a bunch of derived columns here for df2
> df2 = df2.withColumn('derivedcol1', df2.source_col)
>
> # limit output columns for later union
> df2 = df2.select(
> [limited set of columns]
> )
>
> print(df1.rdd.getNumPartitions())
> print(df2.rdd.getNumPartitions())
>
> merge_df = df1.unionAll(df2)
> merge_df.repartition(300)
>
> merge_df.registerTempTable("union_table")
> sqlContext.cacheTable("union_table")
> sqlContext.sql("select count(*) from union_table").collect()
>
> Any suggestions on making this faster?
>


Best way to deal with skewed partition sizes

2017-03-22 Thread Matt Deaver
For various reasons, our data set is partitioned in Spark by customer id
and saved to S3. When trying to read this data, however, the larger
partitions make it difficult to parallelize jobs. For example, out of a
couple thousand companies, some have <10 MB data while some have >10GB.
This is the code I'm using in a Zeppelin notebook and it takes a very long
time to read in (2+ hours on a ~200 GB dataset from S3):

df1 = sqlContext.read.parquet("s3a://[bucket1]/[prefix1]/")
df2 = sqlContext.read.parquet("s3a://[bucket2]/[prefix2]/")

# generate a bunch of derived columns here for df1
df1 = df1.withColumn('derivedcol1', df1.source_col)


# limit output columns for later union
df1 = df1.select(
[limited set of columns]
)

# generate a bunch of derived columns here for df2
df2 = df2.withColumn('derivedcol1', df2.source_col)

# limit output columns for later union
df2 = df2.select(
[limited set of columns]
)

print(df1.rdd.getNumPartitions())
print(df2.rdd.getNumPartitions())

merge_df = df1.unionAll(df2)
merge_df.repartition(300)

merge_df.registerTempTable("union_table")
sqlContext.cacheTable("union_table")
sqlContext.sql("select count(*) from union_table").collect()

Any suggestions on making this faster?