Repository: spark
Updated Branches:
  refs/heads/branch-2.0 3119d8eef -> 00bbf7873


[SPARK-15794] Should truncate toString() of very wide plans

## What changes were proposed in this pull request?

With very wide tables, e.g. thousands of fields, the plan output is unreadable 
and often causes OOMs due to inefficient string processing. This truncates all 
struct and operator field lists to a user configurable threshold to limit 
performance impact.

It would also be nice to optimize string generation to avoid these sort of 
O(n^2) slowdowns entirely (i.e. use StringBuilder everywhere including 
expressions), but this is probably too large of a change for 2.0 at this point, 
and truncation has other benefits for usability.

## How was this patch tested?

Added a microbenchmark that covers this case particularly well. I also ran the 
microbenchmark while varying the truncation threshold.

```
numFields = 5
wide shallowly nested struct field r/w:  Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
------------------------------------------------------------------------------------------------
2000 wide x 50 rows (write in-mem)            2336 / 2558          0.0       
23364.4       0.1X

numFields = 25
wide shallowly nested struct field r/w:  Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
------------------------------------------------------------------------------------------------
2000 wide x 50 rows (write in-mem)            4237 / 4465          0.0       
42367.9       0.1X

numFields = 100
wide shallowly nested struct field r/w:  Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
------------------------------------------------------------------------------------------------
2000 wide x 50 rows (write in-mem)          10458 / 11223          0.0      
104582.0       0.0X

numFields = Infinity
wide shallowly nested struct field r/w:  Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
------------------------------------------------------------------------------------------------
[info]   java.lang.OutOfMemoryError: Java heap space
```

Author: Eric Liang <e...@databricks.com>
Author: Eric Liang <ekhli...@gmail.com>

Closes #13537 from ericl/truncated-string.

(cherry picked from commit b914e1930fd5c5f2808f92d4958ec6fbeddf2e30)
Signed-off-by: Josh Rosen <joshro...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00bbf787
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00bbf787
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00bbf787

Branch: refs/heads/branch-2.0
Commit: 00bbf787340e208cc76230ffd96026c1305f942c
Parents: 3119d8e
Author: Eric Liang <e...@databricks.com>
Authored: Thu Jun 9 18:05:16 2016 -0700
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Thu Jun 9 18:05:30 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/Utils.scala     | 47 +++++++++++++++++++
 .../org/apache/spark/util/UtilsSuite.scala      |  8 ++++
 .../sql/catalyst/expressions/Expression.scala   |  4 +-
 .../spark/sql/catalyst/trees/TreeNode.scala     |  6 +--
 .../org/apache/spark/sql/types/StructType.scala |  7 +--
 .../spark/sql/execution/ExistingRDD.scala       | 10 ++--
 .../spark/sql/execution/QueryExecution.scala    |  5 +-
 .../execution/aggregate/HashAggregateExec.scala |  7 +--
 .../execution/aggregate/SortAggregateExec.scala |  7 +--
 .../execution/datasources/LogicalRelation.scala |  3 +-
 .../org/apache/spark/sql/execution/limit.scala  |  5 +-
 .../execution/streaming/StreamExecution.scala   |  3 +-
 .../spark/sql/execution/streaming/memory.scala  |  3 +-
 .../benchmark/WideSchemaBenchmark.scala         | 49 ++++++++++++++++++++
 14 files changed, 140 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 1a9dbca..f9d0540 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -26,6 +26,7 @@ import java.nio.charset.StandardCharsets
 import java.nio.file.Files
 import java.util.{Locale, Properties, Random, UUID}
 import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicBoolean
 import javax.net.ssl.HttpsURLConnection
 
 import scala.annotation.tailrec
@@ -78,6 +79,52 @@ private[spark] object Utils extends Logging {
   private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
   @volatile private var localRootDirs: Array[String] = null
 
+  /**
+   * The performance overhead of creating and logging strings for wide schemas 
can be large. To
+   * limit the impact, we bound the number of fields to include by default. 
This can be overriden
+   * by setting the 'spark.debug.maxToStringFields' conf in SparkEnv.
+   */
+  val DEFAULT_MAX_TO_STRING_FIELDS = 25
+
+  private def maxNumToStringFields = {
+    if (SparkEnv.get != null) {
+      SparkEnv.get.conf.getInt("spark.debug.maxToStringFields", 
DEFAULT_MAX_TO_STRING_FIELDS)
+    } else {
+      DEFAULT_MAX_TO_STRING_FIELDS
+    }
+  }
+
+  /** Whether we have warned about plan string truncation yet. */
+  private val truncationWarningPrinted = new AtomicBoolean(false)
+
+  /**
+   * Format a sequence with semantics similar to calling .mkString(). Any 
elements beyond
+   * maxNumToStringFields will be dropped and replaced by a "... N more 
fields" placeholder.
+   *
+   * @return the trimmed and formatted string.
+   */
+  def truncatedString[T](
+      seq: Seq[T],
+      start: String,
+      sep: String,
+      end: String,
+      maxNumFields: Int = maxNumToStringFields): String = {
+    if (seq.length > maxNumFields) {
+      if (truncationWarningPrinted.compareAndSet(false, true)) {
+        logWarning(
+          "Truncated the string representation of a plan since it was too 
large. This " +
+          "behavior can be adjusted by setting 'spark.debug.maxToStringFields' 
in SparkEnv.conf.")
+      }
+      val numFields = math.max(0, maxNumFields - 1)
+      seq.take(numFields).mkString(
+        start, sep, sep + "... " + (seq.length - numFields) + " more fields" + 
end)
+    } else {
+      seq.mkString(start, sep, end)
+    }
+  }
+
+  /** Shorthand for calling truncatedString() without start or end strings. */
+  def truncatedString[T](seq: Seq[T], sep: String): String = 
truncatedString(seq, "", sep, "")
 
   /** Serialize an object using Java serialization */
   def serialize[T](o: T): Array[Byte] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 6698749..a5363f0 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -40,6 +40,14 @@ import org.apache.spark.network.util.ByteUnit
 
 class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging 
{
 
+  test("truncatedString") {
+    assert(Utils.truncatedString(Nil, "[", ", ", "]", 2) == "[]")
+    assert(Utils.truncatedString(Seq(1, 2), "[", ", ", "]", 2) == "[1, 2]")
+    assert(Utils.truncatedString(Seq(1, 2, 3), "[", ", ", "]", 2) == "[1, ... 
2 more fields]")
+    assert(Utils.truncatedString(Seq(1, 2, 3), "[", ", ", "]", -5) == "[, ... 
3 more fields]")
+    assert(Utils.truncatedString(Seq(1, 2, 3), ", ") == "1, 2, 3")
+  }
+
   test("timeConversion") {
     // Test -1
     assert(Utils.timeStringAsSeconds("-1") === -1)

http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index efe592d..10a1412 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.trees.TreeNode
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 
////////////////////////////////////////////////////////////////////////////////////////////////////
 // This file defines the basic expression abstract classes in Catalyst.
@@ -196,7 +197,8 @@ abstract class Expression extends TreeNode[Expression] {
 
   override def simpleString: String = toString
 
-  override def toString: String = prettyName + flatArguments.mkString("(", ", 
", ")")
+  override def toString: String = prettyName + Utils.truncatedString(
+    flatArguments.toSeq, "(", ", ", ")")
 
   /**
    * Returns SQL representation of this expression.  For expressions extending 
[[NonSQLExpression]],

http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index c67366d..f924efe 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -448,10 +448,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
     case tn: TreeNode[_] => tn.simpleString :: Nil
     case seq: Seq[Any] if 
seq.toSet.subsetOf(allChildren.asInstanceOf[Set[Any]]) => Nil
     case iter: Iterable[_] if iter.isEmpty => Nil
-    case seq: Seq[_] => seq.mkString("[", ", ", "]") :: Nil
-    case set: Set[_] => set.mkString("{", ", ", "}") :: Nil
+    case seq: Seq[_] => Utils.truncatedString(seq, "[", ", ", "]") :: Nil
+    case set: Set[_] => Utils.truncatedString(set.toSeq, "{", ", ", "}") :: Nil
     case array: Array[_] if array.isEmpty => Nil
-    case array: Array[_] => array.mkString("[", ", ", "]") :: Nil
+    case array: Array[_] => Utils.truncatedString(array, "[", ", ", "]") :: Nil
     case null => Nil
     case None => Nil
     case Some(null) => Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index 9a92373..436512f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -22,11 +22,12 @@ import scala.util.Try
 
 import org.json4s.JsonDSL._
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkEnv, SparkException}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, InterpretedOrdering}
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, 
LegacyTypeStringParser}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
+import org.apache.spark.util.Utils
 
 /**
  * :: DeveloperApi ::
@@ -293,8 +294,8 @@ case class StructType(fields: Array[StructField]) extends 
DataType with Seq[Stru
   override def defaultSize: Int = fields.map(_.dataType.defaultSize).sum
 
   override def simpleString: String = {
-    val fieldTypes = fields.map(field => 
s"${field.name}:${field.dataType.simpleString}")
-    s"struct<${fieldTypes.mkString(",")}>"
+    val fieldTypes = fields.view.map(field => 
s"${field.name}:${field.dataType.simpleString}")
+    Utils.truncatedString(fieldTypes, "struct<", ",", ">")
   }
 
   override def sql: String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index b8b3926..9ab98fd1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.util.Utils
 
 object RDDConversions {
   def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): 
RDD[InternalRow] = {
@@ -123,7 +124,7 @@ private[sql] case class RDDScanExec(
   }
 
   override def simpleString: String = {
-    s"Scan $nodeName${output.mkString("[", ",", "]")}"
+    s"Scan $nodeName${Utils.truncatedString(output, "[", ",", "]")}"
   }
 }
 
@@ -186,7 +187,8 @@ private[sql] case class RowDataSourceScanExec(
       key + ": " + StringUtils.abbreviate(value, 100)
     }
 
-    s"$nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" 
", ", ", "")}"
+    s"$nodeName${Utils.truncatedString(output, "[", ",", "]")}" +
+      s"${Utils.truncatedString(metadataEntries, " ", ", ", "")}"
   }
 
   override def inputRDDs(): Seq[RDD[InternalRow]] = {
@@ -239,8 +241,8 @@ private[sql] case class BatchedDataSourceScanExec(
     val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield {
       key + ": " + StringUtils.abbreviate(value, 100)
     }
-    val metadataStr = metadataEntries.mkString(" ", ", ", "")
-    s"Batched$nodeName${output.mkString("[", ",", "]")}$metadataStr"
+    val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "")
+    s"Batched$nodeName${Utils.truncatedString(output, "[", ",", 
"]")}$metadataStr"
   }
 
   override def inputRDDs(): Seq[RDD[InternalRow]] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 560214a..a2d4502 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -31,6 +31,7 @@ import 
org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCom
 import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ReuseExchange}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, 
TimestampType, _}
+import org.apache.spark.util.Utils
 
 /**
  * The primary workflow for executing relational queries using Spark.  
Designed to allow easy
@@ -206,8 +207,8 @@ class QueryExecution(val sparkSession: SparkSession, val 
logical: LogicalPlan) {
   }
 
   override def toString: String = {
-    def output =
-      analyzed.output.map(o => s"${o.name}: 
${o.dataType.simpleString}").mkString(", ")
+    def output = Utils.truncatedString(
+      analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ")
     val analyzedPlan = Seq(
       stringOrError(output),
       stringOrError(analyzed.treeString(verbose = true))

http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index b617e26..caeeba1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.types.{DecimalType, StringType, StructType}
 import org.apache.spark.unsafe.KVIterator
+import org.apache.spark.util.Utils
 
 /**
  * Hash-based aggregate operator that can also fallback to sorting when data 
exceeds memory size.
@@ -773,9 +774,9 @@ case class HashAggregateExec(
 
     testFallbackStartsAt match {
       case None =>
-        val keyString = groupingExpressions.mkString("[", ",", "]")
-        val functionString = allAggregateExpressions.mkString("[", ",", "]")
-        val outputString = output.mkString("[", ",", "]")
+        val keyString = Utils.truncatedString(groupingExpressions, "[", ",", 
"]")
+        val functionString = Utils.truncatedString(allAggregateExpressions, 
"[", ",", "]")
+        val outputString = Utils.truncatedString(output, "[", ",", "]")
         if (verbose) {
           s"HashAggregate(key=$keyString, functions=$functionString, 
output=$outputString)"
         } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
index 41ba9f5..1712651 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, UnspecifiedDistribution}
 import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.util.Utils
 
 /**
  * Sort-based aggregate operator.
@@ -110,9 +111,9 @@ case class SortAggregateExec(
   private def toString(verbose: Boolean): String = {
     val allAggregateExpressions = aggregateExpressions
 
-    val keyString = groupingExpressions.mkString("[", ",", "]")
-    val functionString = allAggregateExpressions.mkString("[", ",", "]")
-    val outputString = output.mkString("[", ",", "]")
+    val keyString = Utils.truncatedString(groupingExpressions, "[", ",", "]")
+    val functionString = Utils.truncatedString(allAggregateExpressions, "[", 
",", "]")
+    val outputString = Utils.truncatedString(output, "[", ",", "]")
     if (verbose) {
       s"SortAggregate(key=$keyString, functions=$functionString, 
output=$outputString)"
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 0e0748f..a418d02 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -21,6 +21,7 @@ import 
org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
 import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.util.Utils
 
 /**
  * Used to link a [[BaseRelation]] in to a logical query plan.
@@ -82,5 +83,5 @@ case class LogicalRelation(
       expectedOutputAttributes,
       metastoreTableIdentifier).asInstanceOf[this.type]
 
-  override def simpleString: String = s"Relation[${output.mkString(",")}] 
$relation"
+  override def simpleString: String = 
s"Relation[${Utils.truncatedString(output, ",")}] $relation"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index b71f333..781c016 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode, LazilyGeneratedOrdering}
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.exchange.ShuffleExchange
+import org.apache.spark.util.Utils
 
 
 /**
@@ -159,8 +160,8 @@ case class TakeOrderedAndProjectExec(
   override def outputOrdering: Seq[SortOrder] = sortOrder
 
   override def simpleString: String = {
-    val orderByString = sortOrder.mkString("[", ",", "]")
-    val outputString = output.mkString("[", ",", "]")
+    val orderByString = Utils.truncatedString(sortOrder, "[", ",", "]")
+    val outputString = Utils.truncatedString(output, "[", ",", "]")
 
     s"TakeOrderedAndProject(limit=$limit, orderBy=$orderByString, 
output=$outputString)"
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index d9800e4..954fc33 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -336,7 +336,8 @@ class StreamExecution(
         newData.get(source).map { data =>
           val newPlan = data.logicalPlan
           assert(output.size == newPlan.output.size,
-            s"Invalid batch: ${output.mkString(",")} != 
${newPlan.output.mkString(",")}")
+            s"Invalid batch: ${Utils.truncatedString(output, ",")} != " +
+            s"${Utils.truncatedString(newPlan.output, ",")}")
           replacements ++= output.zip(newPlan.output)
           newPlan
         }.getOrElse {

http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 4496f41..77fd043 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LeafNode
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
 
 object MemoryStream {
   protected val currentBlockId = new AtomicInteger(0)
@@ -81,7 +82,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: 
SQLContext)
     }
   }
 
-  override def toString: String = s"MemoryStream[${output.mkString(",")}]"
+  override def toString: String = 
s"MemoryStream[${Utils.truncatedString(output, ",")}]"
 
   override def getOffset: Option[Offset] = synchronized {
     if (batches.isEmpty) {

http://git-wip-us.apache.org/repos/asf/spark/blob/00bbf787/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala
index b4811fe..06466e6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala
@@ -155,6 +155,55 @@ many column field r/w:                   Best/Avg Time(ms) 
   Rate(M/s)   Per Ro
 */
   }
 
+  ignore("wide shallowly nested struct field read and write") {
+    val benchmark = new Benchmark(
+      "wide shallowly nested struct field r/w", scaleFactor)
+    for (width <- widthsToTest) {
+      val numRows = scaleFactor / width
+      var datum: String = "{"
+      for (i <- 1 to width) {
+        if (i == 1) {
+          datum += s""""value_$i": 1"""
+        } else {
+          datum += s""", "value_$i": 1"""
+        }
+      }
+      datum += "}"
+      datum = s"""{"a": {"b": {"c": $datum, "d": $datum}, "e": $datum}}"""
+      val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => 
datum).rdd).cache()
+      df.count()  // force caching
+      addCases(benchmark, df, s"$width wide x $numRows rows", "a.b.c.value_1")
+    }
+    benchmark.run()
+
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.7.0_80-b15 on Linux 4.2.0-36-generic
+Intel(R) Xeon(R) CPU E5-1650 v3 @ 3.50GHz
+wide shallowly nested struct field r/w:  Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+1 wide x 100000 rows (read in-mem)             100 /  125          1.0         
997.7       1.0X
+1 wide x 100000 rows (write in-mem)            130 /  147          0.8        
1302.9       0.8X
+1 wide x 100000 rows (read parquet)            195 /  228          0.5        
1951.4       0.5X
+1 wide x 100000 rows (write parquet)           248 /  259          0.4        
2479.7       0.4X
+10 wide x 10000 rows (read in-mem)              76 /   89          1.3         
757.2       1.3X
+10 wide x 10000 rows (write in-mem)             90 /  116          1.1         
900.0       1.1X
+10 wide x 10000 rows (read parquet)             90 /  135          1.1         
903.9       1.1X
+10 wide x 10000 rows (write parquet)           222 /  240          0.4        
2222.8       0.4X
+100 wide x 1000 rows (read in-mem)              71 /   91          1.4         
710.8       1.4X
+100 wide x 1000 rows (write in-mem)            252 /  324          0.4        
2522.4       0.4X
+100 wide x 1000 rows (read parquet)            310 /  329          0.3        
3095.9       0.3X
+100 wide x 1000 rows (write parquet)           253 /  267          0.4        
2525.7       0.4X
+1000 wide x 100 rows (read in-mem)             144 /  160          0.7        
1439.5       0.7X
+1000 wide x 100 rows (write in-mem)           2055 / 2326          0.0       
20553.9       0.0X
+1000 wide x 100 rows (read parquet)            750 /  925          0.1        
7496.8       0.1X
+1000 wide x 100 rows (write parquet)           235 /  317          0.4        
2347.5       0.4X
+2500 wide x 40 rows (read in-mem)              219 /  227          0.5        
2190.9       0.5X
+2500 wide x 40 rows (write in-mem)            5177 / 5423          0.0       
51773.2       0.0X
+2500 wide x 40 rows (read parquet)            1642 / 1714          0.1       
16417.7       0.1X
+2500 wide x 40 rows (write parquet)            357 /  381          0.3        
3568.2       0.3X
+*/
+  }
+
   ignore("wide struct field read and write") {
     val benchmark = new Benchmark("wide struct field r/w", scaleFactor)
     for (width <- widthsToTest) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to