[jira] [Updated] (SPARK-33152) Constraint Propagation code causes OOM issues or increasing compilation time to hours
[ https://issues.apache.org/jira/browse/SPARK-33152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-33152: - Affects Version/s: 3.1.2 Issue Type: Improvement (was: Bug) > Constraint Propagation code causes OOM issues or increasing compilation time > to hours > - > > Key: SPARK-33152 > URL: https://issues.apache.org/jira/browse/SPARK-33152 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.1, 3.1.2 >Reporter: Asif >Priority: Major > Original Estimate: 168h > Remaining Estimate: 168h > > We encountered this issue at Workday. > The issue is that current Constraints Propagation code pessimistically > generates all the possible permutations of base constraint for the aliases in > the project node. > This causes blow up of the number of constraints generated causing OOM issues > at compile time of sql query, or queries taking 18 min to 2 hrs to compile. > The problematic piece of code is in LogicalPlan.getAliasedConstraints > projectList.foreach { > case a @ Alias(l: Literal, _) => > allConstraints += EqualNullSafe(a.toAttribute, l) > case a @ Alias(e, _) => > // For every alias in `projectList`,replace the reference in > // constraints by its attribute. > allConstraints ++= allConstraints.map(_ transform { > case expr: Expression if expr.semanticEquals(e) => > a.toAttribute > }) > allConstraints += EqualNullSafe(e, a.toAttribute) > case _ => // Don't change. > } > so consider a hypothetical plan > > Project (a, a as a1, a. as a2, a as a3, b, b as b1, b as b2, c, c as c1, c as > c2 , c as c3) > | > Filter f(a, b, c) > | > Base Relation (a, b, c) > and so we have projection as > a, a1, a2, a3 > b, b1, b2 > c, c1, c2, c3 > Lets say hypothetically f(a, b, c) has a occurring 1 times, b occurring 2 > times, and C occurring 3 times. > So at project node the number of constraints for a single base constraint > f(a, b, c) will be > 4C1 * 3C2 * 4C3 = 48 > In our case, we have seen number of constraints going up to > 3 or more, > as there are complex case statements in the projection. > Spark generates all these constraints pessimistically for pruning filters or > push down predicates for join , it may encounter when the optimizer traverses > up the tree. > > This is issue is solved at our end by modifying the spark code to use a > different logic. > The idea is simple. > Instead of generating pessimistically all possible combinations of base > constraint, just store the original base constraints & track the aliases at > each level. > The principal followed is this: > 1) Store the base constraint and keep the track of the aliases for the > underlying attribute. > 2) If the base attribute composing the constraint is not in the output set, > see if the constraint survives by substituting the attribute getting removed > with the next available alias's attribute. > > For checking if a filter can be pruned , just canonicalize the filter with > the attribute at 0th position of the tracking list & compare with the > underlying base constraint. > To elaborate using the plan above. > At project node > We have constraint f(a,b,c) > we keep track of alias > List 1 : a, a1.attribute, a2.attribute, a3.attribute > List2 : b, b1.attribute, b2.attribute > List3: c, c1.attribute, c2.attribute, c3.attribute > Lets say above the project node, we encounter a filter > f(a1, b2, c3) > So canonicalize the filter by using the above list data, to convert it to > f(a,b c) & compare it with the stored base constraints. > > For predicate push down , instead of generating all the redundant > combinations of constraints , just generate one constraint per element of the > alias. > In the current spark code , in any case, filter push down happens only for 1 > variable at a time. > So just expanding the filter (a,b,c) to > f(a, b, c), f(a1, b, c), f(a2, b, c), f(a3, , b ,c), f (a, b1, c), f(a, b2, > c) , f(a, b, c1), f(a, b, c2), f(a, b, c3) > would suffice, rather than generating all the redundant combinations. > In fact the code can be easily modified to generate only those constraints > which involve variables forming the join condition. so the number of > constraints generated on expand are further reduced. > We already have code to generate compound filters for push down ( join on > multiple conditions), which can be used for single variable condition, push > down too. > Just to elaborate the logic further, if we consider the above hypothetical > plan (assume collapse project rule is not there) > > Project (a1, a1. as a4, b, c1, c1 as c4) > | > Project (a, a as a1, a. as a2, a as a3, b, b as b1, b as b2, c, c as c1, c as > c2 , c as c3) > | > Filter f(a, b, c) > | > Base Relati
[jira] [Updated] (SPARK-33152) Constraint Propagation code causes OOM issues or increasing compilation time to hours
[ https://issues.apache.org/jira/browse/SPARK-33152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-33152: - Affects Version/s: 3.0.1 > Constraint Propagation code causes OOM issues or increasing compilation time > to hours > - > > Key: SPARK-33152 > URL: https://issues.apache.org/jira/browse/SPARK-33152 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0, 3.0.1 >Reporter: Asif >Priority: Major > Original Estimate: 168h > Remaining Estimate: 168h > > We encountered this issue at Workday. > The issue is that current Constraints Propagation code pessimistically > generates all the possible permutations of base constraint for the aliases in > the project node. > This causes blow up of the number of constraints generated causing OOM issues > at compile time of sql query, or queries taking 18 min to 2 hrs to compile. > The problematic piece of code is in LogicalPlan.getAliasedConstraints > projectList.foreach { > case a @ Alias(l: Literal, _) => > allConstraints += EqualNullSafe(a.toAttribute, l) > case a @ Alias(e, _) => > // For every alias in `projectList`,replace the reference in > // constraints by its attribute. > allConstraints ++= allConstraints.map(_ transform { > case expr: Expression if expr.semanticEquals(e) => > a.toAttribute > }) > allConstraints += EqualNullSafe(e, a.toAttribute) > case _ => // Don't change. > } > so consider a hypothetical plan > > Project (a, a as a1, a. as a2, a as a3, b, b as b1, b as b2, c, c as c1, c as > c2 , c as c3) > | > Filter f(a, b, c) > | > Base Relation (a, b, c) > and so we have projection as > a, a1, a2, a3 > b, b1, b2 > c, c1, c2, c3 > Lets say hypothetically f(a, b, c) has a occurring 1 times, b occurring 2 > times, and C occurring 3 times. > So at project node the number of constraints for a single base constraint > f(a, b, c) will be > 4C1 * 3C2 * 4C3 = 48 > In our case, we have seen number of constraints going up to > 3 or more, > as there are complex case statements in the projection. > Spark generates all these constraints pessimistically for pruning filters or > push down predicates for join , it may encounter when the optimizer traverses > up the tree. > > This is issue is solved at our end by modifying the spark code to use a > different logic. > The idea is simple. > Instead of generating pessimistically all possible combinations of base > constraint, just store the original base constraints & track the aliases at > each level. > The principal followed is this: > 1) Store the base constraint and keep the track of the aliases for the > underlying attribute. > 2) If the base attribute composing the constraint is not in the output set, > see if the constraint survives by substituting the attribute getting removed > with the next available alias's attribute. > > For checking if a filter can be pruned , just canonicalize the filter with > the attribute at 0th position of the tracking list & compare with the > underlying base constraint. > To elaborate using the plan above. > At project node > We have constraint f(a,b,c) > we keep track of alias > List 1 : a, a1.attribute, a2.attribute, a3.attribute > List2 : b, b1.attribute, b2.attribute > List3: c, c1.attribute, c2.attribute, c3.attribute > Lets say above the project node, we encounter a filter > f(a1, b2, c3) > So canonicalize the filter by using the above list data, to convert it to > f(a,b c) & compare it with the stored base constraints. > > For predicate push down , instead of generating all the redundant > combinations of constraints , just generate one constraint per element of the > alias. > In the current spark code , in any case, filter push down happens only for 1 > variable at a time. > So just expanding the filter (a,b,c) to > f(a, b, c), f(a1, b, c), f(a2, b, c), f(a3, , b ,c), f (a, b1, c), f(a, b2, > c) , f(a, b, c1), f(a, b, c2), f(a, b, c3) > would suffice, rather than generating all the redundant combinations. > In fact the code can be easily modified to generate only those constraints > which involve variables forming the join condition. so the number of > constraints generated on expand are further reduced. > We already have code to generate compound filters for push down ( join on > multiple conditions), which can be used for single variable condition, push > down too. > Just to elaborate the logic further, if we consider the above hypothetical > plan (assume collapse project rule is not there) > > Project (a1, a1. as a4, b, c1, c1 as c4) > | > Project (a, a as a1, a. as a2, a as a3, b, b as b1, b as b2, c, c as c1, c as > c2 , c as c3) > | > Filter f(a, b, c) > | > Base Relation (a, b, c) > > So at project node2, the constraints data w
[jira] [Updated] (SPARK-33152) Constraint Propagation code causes OOM issues or increasing compilation time to hours
[ https://issues.apache.org/jira/browse/SPARK-33152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuming Wang updated SPARK-33152: Target Version/s: (was: 2.4.0) > Constraint Propagation code causes OOM issues or increasing compilation time > to hours > - > > Key: SPARK-33152 > URL: https://issues.apache.org/jira/browse/SPARK-33152 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Asif >Priority: Major > Original Estimate: 168h > Remaining Estimate: 168h > > We encountered this issue at Workday. > The issue is that current Constraints Propagation code pessimistically > generates all the possible permutations of base constraint for the aliases in > the project node. > This causes blow up of the number of constraints generated causing OOM issues > at compile time of sql query, or queries taking 18 min to 2 hrs to compile. > The problematic piece of code is in LogicalPlan.getAliasedConstraints > projectList.foreach { > case a @ Alias(l: Literal, _) => > allConstraints += EqualNullSafe(a.toAttribute, l) > case a @ Alias(e, _) => > // For every alias in `projectList`,replace the reference in > // constraints by its attribute. > allConstraints ++= allConstraints.map(_ transform { > case expr: Expression if expr.semanticEquals(e) => > a.toAttribute > }) > allConstraints += EqualNullSafe(e, a.toAttribute) > case _ => // Don't change. > } > so consider a hypothetical plan > > Project (a, a as a1, a. as a2, a as a3, b, b as b1, b as b2, c, c as c1, c as > c2 , c as c3) > | > Filter f(a, b, c) > | > Base Relation (a, b, c) > and so we have projection as > a, a1, a2, a3 > b, b1, b2 > c, c1, c2, c3 > Lets say hypothetically f(a, b, c) has a occurring 1 times, b occurring 2 > times, and C occurring 3 times. > So at project node the number of constraints for a single base constraint > f(a, b, c) will be > 4C1 * 3C2 * 4C3 = 48 > In our case, we have seen number of constraints going up to > 3 or more, > as there are complex case statements in the projection. > Spark generates all these constraints pessimistically for pruning filters or > push down predicates for join , it may encounter when the optimizer traverses > up the tree. > > This is issue is solved at our end by modifying the spark code to use a > different logic. > The idea is simple. > Instead of generating pessimistically all possible combinations of base > constraint, just store the original base constraints & track the aliases at > each level. > The principal followed is this: > 1) Store the base constraint and keep the track of the aliases for the > underlying attribute. > 2) If the base attribute composing the constraint is not in the output set, > see if the constraint survives by substituting the attribute getting removed > with the next available alias's attribute. > > For checking if a filter can be pruned , just canonicalize the filter with > the attribute at 0th position of the tracking list & compare with the > underlying base constraint. > To elaborate using the plan above. > At project node > We have constraint f(a,b,c) > we keep track of alias > List 1 : a, a1.attribute, a2.attribute, a3.attribute > List2 : b, b1.attribute, b2.attribute > List3: c, c1.attribute, c2.attribute, c3.attribute > Lets say above the project node, we encounter a filter > f(a1, b2, c3) > So canonicalize the filter by using the above list data, to convert it to > f(a,b c) & compare it with the stored base constraints. > > For predicate push down , instead of generating all the redundant > combinations of constraints , just generate one constraint per element of the > alias. > In the current spark code , in any case, filter push down happens only for 1 > variable at a time. > So just expanding the filter (a,b,c) to > f(a, b, c), f(a1, b, c), f(a2, b, c), f(a3, , b ,c), f (a, b1, c), f(a, b2, > c) , f(a, b, c1), f(a, b, c2), f(a, b, c3) > would suffice, rather than generating all the redundant combinations. > In fact the code can be easily modified to generate only those constraints > which involve variables forming the join condition. so the number of > constraints generated on expand are further reduced. > We already have code to generate compound filters for push down ( join on > multiple conditions), which can be used for single variable condition, push > down too. > Just to elaborate the logic further, if we consider the above hypothetical > plan (assume collapse project rule is not there) > > Project (a1, a1. as a4, b, c1, c1 as c4) > | > Project (a, a as a1, a. as a2, a as a3, b, b as b1, b as b2, c, c as c1, c as > c2 , c as c3) > | > Filter f(a, b, c) > | > Base Relation (a, b, c) > > So at project node2, the con