This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 812ad55 [SPARK-26103][SQL] Limit the length of debug strings for query plans 812ad55 is described below commit 812ad5546148d2194ab0e4230ee85b8f6a5be2fb Author: Dave DeCaprio <da...@alum.mit.edu> AuthorDate: Wed Mar 13 09:58:43 2019 -0700 [SPARK-26103][SQL] Limit the length of debug strings for query plans ## What changes were proposed in this pull request? The PR puts in a limit on the size of a debug string generated for a tree node. Helps to fix out of memory errors when large plans have huge debug strings. In addition to SPARK-26103, this should also address SPARK-23904 and SPARK-25380. AN alternative solution was proposed in #23076, but that solution doesn't address all the cases that can cause a large query. This limit is only on calls treeString that don't pass a Writer, which makes it play nicely with #22429, #23018 and #230 [...] - A new configuration parameter called spark.sql.debug.maxPlanLength was added to control the length of the plans. - When plans are truncated, "..." is printed to indicate that it isn't a full plan - A warning is printed out the first time a truncated plan is displayed. The warning explains what happened and how to adjust the limit. ## How was this patch tested? Unit tests were created for the new SizeLimitedWriter. Also a unit test for TreeNode was created that checks that a long plan is correctly truncated. Closes #23169 from DaveDeCaprio/text-plan-size. Lead-authored-by: Dave DeCaprio <da...@alum.mit.edu> Co-authored-by: David DeCaprio <da...@alum.mit.edu> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- .../apache/spark/sql/catalyst/trees/TreeNode.scala | 4 +- .../spark/sql/catalyst/util/StringUtils.scala | 58 ++++++++++++++++++---- .../org/apache/spark/sql/internal/SQLConf.scala | 13 +++++ .../spark/sql/catalyst/trees/TreeNodeSuite.scala | 29 ++++++++++- .../spark/sql/catalyst/util/StringUtilsSuite.scala | 33 +++++++++--- .../spark/sql/execution/QueryExecution.scala | 14 +++--- 6 files changed, 126 insertions(+), 25 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index d214ebb..72b1931 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} -import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat +import org.apache.spark.sql.catalyst.util.StringUtils.{PlanStringConcat, StringConcat} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -480,7 +480,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { verbose: Boolean, addSuffix: Boolean = false, maxFields: Int = SQLConf.get.maxToStringFields): String = { - val concat = new StringConcat() + val concat = new PlanStringConcat() treeString(concat.append, verbose, addSuffix, maxFields) concat.toString diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala index 643b83b..6118d8c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala @@ -17,14 +17,18 @@ package org.apache.spark.sql.catalyst.util +import java.util.concurrent.atomic.AtomicBoolean import java.util.regex.{Pattern, PatternSyntaxException} import scala.collection.mutable.ArrayBuffer +import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.unsafe.types.UTF8String -object StringUtils { +object StringUtils extends Logging { /** * Validate and convert SQL 'like' pattern to a Java regular expression. @@ -92,20 +96,29 @@ object StringUtils { /** * Concatenation of sequence of strings to final string with cheap append method - * and one memory allocation for the final string. + * and one memory allocation for the final string. Can also bound the final size of + * the string. */ - class StringConcat { - private val strings = new ArrayBuffer[String] - private var length: Int = 0 + class StringConcat(val maxLength: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + protected val strings = new ArrayBuffer[String] + protected var length: Int = 0 + + def atLimit: Boolean = length >= maxLength /** * Appends a string and accumulates its length to allocate a string buffer for all - * appended strings once in the toString method. + * appended strings once in the toString method. Returns true if the string still + * has room for further appends before it hits its max limit. */ def append(s: String): Unit = { if (s != null) { - strings.append(s) - length += s.length + val sLen = s.length + if (!atLimit) { + val available = maxLength - length + val stringToAppend = if (available >= sLen) s else s.substring(0, available) + strings.append(stringToAppend) + } + length += sLen } } @@ -114,9 +127,36 @@ object StringUtils { * returns concatenated string. */ override def toString: String = { - val result = new java.lang.StringBuilder(length) + val finalLength = if (atLimit) maxLength else length + val result = new java.lang.StringBuilder(finalLength) strings.foreach(result.append) result.toString } } + + /** + * A string concatenator for plan strings. Uses length from a configured value, and + * prints a warning the first time a plan is truncated. + */ + class PlanStringConcat extends StringConcat(Math.max(0, SQLConf.get.maxPlanStringLength - 30)) { + override def toString: String = { + if (atLimit) { + logWarning( + "Truncated the string representation of a plan since it was too long. The " + + s"plan had length ${length} and the maximum is ${maxLength}. This behavior " + + "can be adjusted by setting '${SQLConf.MAX_PLAN_STRING_LENGTH.key}'.") + val truncateMsg = if (maxLength == 0) { + s"Truncated plan of $length characters" + } else { + s"... ${length - maxLength} more characters" + } + val result = new java.lang.StringBuilder(maxLength + truncateMsg.length) + strings.foreach(result.append) + result.append(truncateMsg) + result.toString + } else { + super.toString + } + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 193d311..20f4080 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1691,6 +1691,17 @@ object SQLConf { .intConf .createWithDefault(25) + val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.maxPlanStringLength") + .doc("Maximum number of characters to output for a plan string. If the plan is " + + "longer, further output will be truncated. The default setting always generates a full " + + "plan. Set this to a lower value such as 8k if plan strings are taking up too much " + + "memory or are causing OutOfMemory errors in the driver or UI processes.") + .bytesConf(ByteUnit.BYTE) + .checkValue(i => i >= 0 && i <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, "Invalid " + + "value for 'spark.sql.maxPlanStringLength'. Length must be a valid string length " + + "(nonnegative and shorter than the maximum size).") + .createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) + val SET_COMMAND_REJECTS_SPARK_CORE_CONFS = buildConf("spark.sql.legacy.setCommandRejectsSparkCoreConfs") .internal() @@ -2146,6 +2157,8 @@ class SQLConf extends Serializable with Logging { def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS) + def maxPlanStringLength: Int = getConf(SQLConf.MAX_PLAN_STRING_LENGTH).toInt + def setCommandRejectsSparkCoreConfs: Boolean = getConf(SQLConf.SET_COMMAND_REJECTS_SPARK_CORE_CONFS) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index cb911d7..e7ad04f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -33,9 +33,10 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.dsl.expressions.DslString import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback -import org.apache.spark.sql.catalyst.plans.{LeftOuter, NaturalJoin} +import org.apache.spark.sql.catalyst.plans.{LeftOuter, NaturalJoin, SQLHelper} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Union} import org.apache.spark.sql.catalyst.plans.physical.{IdentityBroadcastMode, RoundRobinPartitioning, SinglePartition} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel @@ -81,7 +82,7 @@ case class SelfReferenceUDF( def apply(key: String): Boolean = config.contains(key) } -class TreeNodeSuite extends SparkFunSuite { +class TreeNodeSuite extends SparkFunSuite with SQLHelper { test("top node changed") { val after = Literal(1) transform { case Literal(1, _) => Literal(2) } assert(after === Literal(2)) @@ -595,4 +596,28 @@ class TreeNodeSuite extends SparkFunSuite { val expected = Coalesce(Stream(Literal(1), Literal(3))) assert(result === expected) } + + test("treeString limits plan length") { + withSQLConf(SQLConf.MAX_PLAN_STRING_LENGTH.key -> "200") { + val ds = (1 until 20).foldLeft(Literal("TestLiteral"): Expression) { case (treeNode, x) => + Add(Literal(x), treeNode) + } + + val planString = ds.treeString + logWarning("Plan string: " + planString) + assert(planString.endsWith(" more characters")) + assert(planString.length <= SQLConf.get.maxPlanStringLength) + } + } + + test("treeString limit at zero") { + withSQLConf(SQLConf.MAX_PLAN_STRING_LENGTH.key -> "0") { + val ds = (1 until 2).foldLeft(Literal("TestLiteral"): Expression) { case (treeNode, x) => + Add(Literal(x), treeNode) + } + + val planString = ds.treeString + assert(planString.startsWith("Truncated plan of")) + } + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala index 616ec12..63d3831 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala @@ -46,14 +46,35 @@ class StringUtilsSuite extends SparkFunSuite { test("string concatenation") { def concat(seq: String*): String = { - seq.foldLeft(new StringConcat())((acc, s) => {acc.append(s); acc}).toString + seq.foldLeft(new StringConcat()) { (acc, s) => acc.append(s); acc }.toString } assert(new StringConcat().toString == "") - assert(concat("") == "") - assert(concat(null) == "") - assert(concat("a") == "a") - assert(concat("1", "2") == "12") - assert(concat("abc", "\n", "123") == "abc\n123") + assert(concat("") === "") + assert(concat(null) === "") + assert(concat("a") === "a") + assert(concat("1", "2") === "12") + assert(concat("abc", "\n", "123") === "abc\n123") + } + + test("string concatenation with limit") { + def concat(seq: String*): String = { + seq.foldLeft(new StringConcat(7)) { (acc, s) => acc.append(s); acc }.toString + } + assert(concat("under") === "under") + assert(concat("under", "over", "extra") === "underov") + assert(concat("underover") === "underov") + assert(concat("under", "ov") === "underov") + } + + test("string concatenation return value") { + def checkLimit(s: String): Boolean = { + val sc = new StringConcat(7) + sc.append(s) + sc.atLimit + } + assert(!checkLimit("under")) + assert(checkLimit("1234567")) + assert(checkLimit("1234567890")) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 49d6acf..5d2710bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat +import org.apache.spark.sql.catalyst.util.StringUtils.{PlanStringConcat, StringConcat} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} import org.apache.spark.sql.internal.SQLConf @@ -114,7 +114,7 @@ class QueryExecution( ReuseSubquery(sparkSession.sessionState.conf)) def simpleString: String = withRedaction { - val concat = new StringConcat() + val concat = new PlanStringConcat() concat.append("== Physical Plan ==\n") QueryPlan.append(executedPlan, concat.append, verbose = false, addSuffix = false) concat.append("\n") @@ -142,13 +142,13 @@ class QueryExecution( } override def toString: String = withRedaction { - val concat = new StringConcat() + val concat = new PlanStringConcat() writePlans(concat.append, SQLConf.get.maxToStringFields) concat.toString } def stringWithStats: String = withRedaction { - val concat = new StringConcat() + val concat = new PlanStringConcat() val maxFields = SQLConf.get.maxToStringFields // trigger to compute stats for logical plans @@ -203,9 +203,11 @@ class QueryExecution( val filePath = new Path(path) val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf()) val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath))) - + val append = (s: String) => { + writer.write(s) + } try { - writePlans(writer.write, maxFields) + writePlans(append, maxFields) writer.write("\n== Whole Stage Codegen ==\n") org.apache.spark.sql.execution.debug.writeCodegen(writer.write, executedPlan) } finally { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org