viirya commented on a change in pull request #26809: [SPARK-30185][SQL] 
Implement Dataset.tail API
URL: https://github.com/apache/spark/pull/26809#discussion_r360628682
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
 ##########
 @@ -309,20 +309,41 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
    * UnsafeRow is highly compressible (at least 8 bytes for any column), the 
byte array is also
    * compressed.
    */
-  private def getByteArrayRdd(n: Int = -1): RDD[(Long, Array[Byte])] = {
+  private def getByteArrayRdd(n: Int = -1, reverse: Boolean = false): 
RDD[(Long, Array[Byte])] = {
     execute().mapPartitionsInternal { iter =>
       var count = 0
       val buffer = new Array[Byte](4 << 10)  // 4K
       val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
       val bos = new ByteArrayOutputStream()
       val out = new DataOutputStream(codec.compressedOutputStream(bos))
-      // `iter.hasNext` may produce one row and buffer it, we should only call 
it when the limit is
-      // not hit.
-      while ((n < 0 || count < n) && iter.hasNext) {
-        val row = iter.next().asInstanceOf[UnsafeRow]
-        out.writeInt(row.getSizeInBytes)
-        row.writeToStream(out, buffer)
-        count += 1
+
+      if (reverse) {
+        // To collect n from the last, we should anyway read everything with 
keeping the n.
+        // Otherwise, we don't know where is the last from the iterator.
+        var last: Seq[UnsafeRow] = Seq.empty[UnsafeRow]
+        if (n > 0) {
+          // NOTE: when reverse, -1 is not supported. There is no difference 
with collecting
+          // all and reversing.
+          val slidingIter = iter.map(_.copy()).sliding(n)
+          while (slidingIter.hasNext) { last = 
slidingIter.next().asInstanceOf[Seq[UnsafeRow]] }
 
 Review comment:
   do we need to copy rows if it is not last sliding window?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to