[ https://issues.apache.org/jira/browse/SPARK-23970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16438132#comment-16438132 ]
Liang-Chi Hsieh commented on SPARK-23970: ----------------------------------------- I think the document of {{coalesce}} might answer this, let me quote it: * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, * this may result in your computation taking place on fewer nodes than * you like (e.g. one node in the case of numPartitions = 1). To avoid this, * you can call repartition. This will add a shuffle step, but means the * current upstream partitions will be executed in parallel (per whatever * the current partitioning is). When you do a {{coalesce}}, you make your computation taking place on certain number of partitions. > pyspark - simple filter/select doesn't use all tasks when coalesce is set > ------------------------------------------------------------------------- > > Key: SPARK-23970 > URL: https://issues.apache.org/jira/browse/SPARK-23970 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.2.0, 2.2.1 > Reporter: Matthew Anthony > Priority: Major > > Running in (py)spark 2.2. > Marking this as PySpark, but have not confirmed whether this is Spark-wide; > I've observed it in pyspark which is my preferred API. > {code:java} > df = spark.sql( > """ > select <redacted> > from <inputtbl> > where <conditions > """ > ) > df.coalesce(32).write.parquet(...){code} > The above code will only attempt to use 32 tasks to read and process all of > the original input data. This compares to > {code:java} > df = spark.sql( > """ > select <redacted> > from <inputtbl> > where <conditions > """ > ).cache() > df.count() > df.coalesce(32).write.parquet(...){code} > where this will use the full complement of tasks available to the cluster to > do the initial filter, with a subsequent shuffle to coalesce and write. The > latter execution path is way more efficient, particularly at large volumes > where filtering will remove most records and should be the default. Note that > in the real setting in which I am running this, I'm operating a 20 node > cluster with 16 cores and 56gb RAM per machine, and processing well over a TB > of raw data in <inputtbl>. The scale of the task I am testing on generates > approximately 300,000 read tasks in the latter version of the code when not > constrained by the former's execution plan. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org