Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22219#discussion_r214785788 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -3237,6 +3238,28 @@ class Dataset[T] private[sql]( files.toSet.toArray } + /** + * Returns the tuple of the row count and an SeqView that contains all rows in this Dataset. + * + * The SeqView will consume as much memory as the total size of serialized results which can be + * limited with the config 'spark.driver.maxResultSize'. Rows are deserialized when iterating rows + * with iterator of returned SeqView. Whether to collect all deserialized rows or to iterate them + * incrementally can be decided with considering total rows count and driver memory. + */ + private[sql] def collectCountAndSeqView(): (Long, SeqView[T, Array[T]]) = + withAction("collectCountAndSeqView", queryExecution) { plan => + // This projection writes output to a `InternalRow`, which means applying this projection is + // not thread-safe. Here we create the projection inside this method to make `Dataset` + // thread-safe. + val objProj = GenerateSafeProjection.generate(deserializer :: Nil) + val (totalRowCount, internalRowsView) = plan.executeCollectSeqView() + (totalRowCount, internalRowsView.map { row => + // The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type + // parameter of its `get` method, so it's safe to use null here. + objProj(row).get(0, null).asInstanceOf[T] + }.asInstanceOf[SeqView[T, Array[T]]]) + } --- End diff -- If this is a thriftserver specific issue, can we do the same thing by fixing code only in the thriftserver package? IMHO we'd be better not to modify code in the sql package as much as possible.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org