Re: [Spark Core]: Adding support for size based partition coalescing

2021-05-25 Thread Wenchen Fan
Without AQE, repartition() simply creates 200 (the value of spark.sql.shuffle.partitions) partitions AFAIK. The AQE helps you to coalesce the partitions into a reasonable number, by size. Note that you need to tune spark.sql.shuffle.partitions to make sure it's big enough, as AQE can not increase

Re: [Spark Core]: Adding support for size based partition coalescing

2021-05-24 Thread Tom Graves
so repartition() would look at some other config (spark.sql.adaptive.advisoryPartitionSizeInBytes) to decide the size to use to partition it on then?  Does it require AQE?  If so what does a repartition() call do if AQE is not enabled? this is essentially a new api so would repartitionBySize

Re: [Spark Core]: Adding support for size based partition coalescing

2021-05-24 Thread Wenchen Fan
Ideally this should be handled by the underlying data source to produce a reasonably partitioned RDD as the input data. However if we already have a poorly partitioned RDD at hand and want to repartition it properly, I think an extra shuffle is required so that we can know the partition size

Re: [Spark Core]: Adding support for size based partition coalescing

2021-05-21 Thread mhawes
Adding /another/ update to say that I'm currently planning on using a recently introduced feature whereby calling `.repartition()` with no args will cause the dataset to be optimised by AQE. This actually suits our use-case perfectly! Example:

Re: [Spark Core]: Adding support for size based partition coalescing

2021-05-11 Thread mhawes
Hi angers.zhu, Reviving this thread to say that while it's not ideal (as it recomputes the last stage) I think the `SizeBasedCoaleaser` solution seems like a good option. If you don't mind re-raising that PR that would be great. Alternatively I'm happy to make the PR based on your previous PR?

Re: [Spark Core]: Adding support for size based partition coalescing

2021-04-01 Thread German Schiavon
Hi! have you tried spark.sql.files.maxRecordsPerFile ? As a workaround you could try to see how many rows are 128MB and then set that number in that property. Best On Thu, 1 Apr 2021 at 00:38, mhawes wrote: > Okay from looking closer at some of the code, I'm not sure that what I'm > asking

Re: [Spark Core]: Adding support for size based partition coalescing

2021-03-31 Thread mhawes
Okay from looking closer at some of the code, I'm not sure that what I'm asking for in terms of adaptive execution makes much sense as it can only happen between stages. I.e. optimising future /stages/ based on the results of previous stages. Thus an "on-demand" adaptive coalesce doesn't make much

Re: [Spark Core]: Adding support for size based partition coalescing

2021-03-31 Thread mhawes
Hi angers.zhu, Thanks for pointing me towards that PR, I think the main issue there is that the coalesce operation requires an additional computation which in this case is undesirable. It also approximates the row size rather than just directly using the partition size. Thus it has the potential

Re: [Spark Core]: Adding support for size based partition coalescing

2021-03-31 Thread angers zhu
Hi all, you mean something like this https://github.com/apache/spark/pull/27248/files? If you need I can raise a pr add a SizeBasedCoaleaser mhawes 于2021年3月30日周二 下午9:06写道: > Hi Pol, I had considered repartitioning but the main issue for me there is > that it will trigger a shuffle and could

Re: [Spark Core]: Adding support for size based partition coalescing

2021-03-30 Thread mhawes
Hi Pol, I had considered repartitioning but the main issue for me there is that it will trigger a shuffle and could significantly slow down the query/application as a result. Thanks for contributing that as an alternative suggestion though :) -- Sent from:

Re: [Spark Core]: Adding support for size based partition coalescing

2021-03-30 Thread Pol Santamaria
Hi Matt, I have encountered the same issue several times so I totally agree with you that it would be a useful addition to Spark. I frequently solve the unbalance by coding a custom partitioner which is far from ideal, since then I get down to RDDs. I don't know the Spark code base well enough to

[Spark Core]: Adding support for size based partition coalescing

2021-03-30 Thread mhawes
Hi all, Sending this first before creating a jira issue in an effort to start a discussion :) Problem: We have a situation where we end with a very large number (O(10K)) of partitions, with very little data in most partitions but a lot of data in some of them. This not only causes slow execution