[ https://issues.apache.org/jira/browse/SPARK-34369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-34369: ------------------------------------ Assignee: (was: Apache Spark) > Track number of pairs processed out of Join > ------------------------------------------- > > Key: SPARK-34369 > URL: https://issues.apache.org/jira/browse/SPARK-34369 > Project: Spark > Issue Type: New Feature > Components: Web UI > Affects Versions: 3.2.0 > Reporter: Srinivas Rishindra Pothireddi > Priority: Major > > Often users face a scenario where even a modest skew in a join can lead to > tasks appearing to be stuck, due to the O(n^2) nature of a join considering > all pairs of rows with matching keys. When this happens users think that > spark has gotten deadlocked. If there is a bound condition, the "number of > output rows" metric may look typical. Other metrics may look very modest (eg: > shuffle read). In those cases, it is very hard to understand what the problem > is. There is no conclusive proof without getting a heap dump and looking at > some internal data structures. > It would be much better if spark had a metric(which we propose be titled > “number of matched pairs” as a companion to “number of output rows”) which > showed the user how many pairs were being processed in the join. This would > get updated in the live UI (when metrics get collected during heartbeats), so > the user could easily see what was going on. > This would even help in cases where there was some other cause of a stuck > executor (eg. network issues) just to disprove this theory. For example, you > may have 100k records with the same key on each side of a join. That probably > won't really show up as extreme skew in task input data. But it'll become 10B > join pairs that spark works through, in one task. > > To further demonstrate the usefulness of this metric please follow the steps > below. > > _val df1 = spark.range(0, 200000).map \{ x => (x % 20, 20) }.toDF("b", > "c")_ > _val df2 = spark.range(0, 300000).map \{ x => (77, 20) }.toDF("b", "c")_ > > _val df3 = spark.range(0, 200000).map(x => (x + 1, x + 2)).toDF("b", "c")_ > _val df4 = spark.range(0, 300000).map(x => (77, x + 2)).toDF("b", "c")_ > > _val df5 = df1.union(df2)_ > _val df6 = df3.union(df4)_ > > _df5.createOrReplaceTempView("table1")_ > _df6.createOrReplaceTempView("table2")_ > h3. InnerJoin > _sql("select p.**,* f.* from table2 p join table1 f on f.b = p.b and f.c > > p.c").count_ > _number of output rows: 5,580,000_ > _number of matched pairs: 90,000,490,000_ > h3. FullOuterJoin > _spark.sql("select p.**,* f.* from table2 p full outer join table1 f on f.b = > p.b and f.c > p.c").count_ > _number of output rows: 6,099,964_ > _number of matched pairs: 90,000,490,000_ > h3. LeftOuterJoin > _sql("select p.**,* f.* from table2 p left outer join table1 f on f.b = p.b > and f.c > p.c").count_ > _number of output rows: 6,079,964_ > _number of matched pairs: 90,000,490,000_ > h3. RightOuterJoin > _spark.sql("select p.**,* f.* from table2 p right outer join table1 f on f.b > = p.b and f.c > p.c").count_ > _number of output rows: 5,600,000_ > _number of matched pairs: 90,000,490,000_ > h3. LeftSemiJoin > _spark.sql("select * from table2 p left semi join table1 f on f.b = p.b and > f.c > p.c").count_ > _number of output rows: 36_ > _number of matched pairs: 89,994,910,036_ > h3. CrossJoin > _spark.sql("select p.*, f.* from table2 p cross join table1 f on f.b = p.b > and f.c > p.c").count_ > _number of output rows: 5,580,000_ > _number of matched pairs: 90,000,490,000_ > h3. LeftAntiJoin > _spark.sql("select * from table2 p anti join table1 f on f.b = p.b and f.c > > p.c").count_ > number of output rows: 499,964 > number of matched pairs: 89,994,910,036 -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org