Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22219#discussion_r215115685 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -3237,6 +3238,28 @@ class Dataset[T] private[sql]( files.toSet.toArray } + /** + * Returns the tuple of the row count and an SeqView that contains all rows in this Dataset. + * + * The SeqView will consume as much memory as the total size of serialized results which can be + * limited with the config 'spark.driver.maxResultSize'. Rows are deserialized when iterating rows + * with iterator of returned SeqView. Whether to collect all deserialized rows or to iterate them + * incrementally can be decided with considering total rows count and driver memory. + */ + private[sql] def collectCountAndSeqView(): (Long, SeqView[T, Array[T]]) = + withAction("collectCountAndSeqView", queryExecution) { plan => + // This projection writes output to a `InternalRow`, which means applying this projection is + // not thread-safe. Here we create the projection inside this method to make `Dataset` + // thread-safe. + val objProj = GenerateSafeProjection.generate(deserializer :: Nil) + val (totalRowCount, internalRowsView) = plan.executeCollectSeqView() + (totalRowCount, internalRowsView.map { row => + // The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type + // parameter of its `get` method, so it's safe to use null here. + objProj(row).get(0, null).asInstanceOf[T] + }.asInstanceOf[SeqView[T, Array[T]]]) + } --- End diff -- how about changing private to private[sql], then implementing this based on the deserializer?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org