eejbyfeldt commented on code in PR #38428:
URL: https://github.com/apache/spark/pull/38428#discussion_r1010340881


##########
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala:
##########
@@ -301,15 +300,18 @@ class KryoDeserializationStream(
 
   private[this] var kryo: Kryo = serInstance.borrowKryo()
 
+  final private[this] def hasNext: Boolean = {

Review Comment:
   Yes, will fix.



##########
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala:
##########
@@ -324,6 +326,36 @@ class KryoDeserializationStream(
       }
     }
   }
+
+  final override def asIterator: Iterator[Any] = new NextIterator[Any] {
+    override protected def getNext() = {
+      if (KryoDeserializationStream.this.hasNext) {
+        readValue[Any]()
+      } else {
+        finished = true
+        null
+      }
+    }
+
+    override protected def close(): Unit = {
+      KryoDeserializationStream.this.close()
+    }
+  }
+
+  final override def asKeyValueIterator: Iterator[(Any, Any)] = new 
NextIterator[(Any, Any)] {
+    override protected def getNext() = {
+      if (KryoDeserializationStream.this.hasNext) {
+        (readKey[Any](), readValue[Any]())

Review Comment:
   You mean that if only a key exist we just ignore it like the current 
implementation would?



##########
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala:
##########
@@ -301,15 +300,18 @@ class KryoDeserializationStream(
 
   private[this] var kryo: Kryo = serInstance.borrowKryo()
 
+  final private[this] def hasNext: Boolean = {
+    if (input == null) {
+      return false
+    }
+
+    val eof = input.eof()
+    if (eof) close()
+    !eof
+  }
+
   override def readObject[T: ClassTag](): T = {
-    try {
       kryo.readClassAndObject(input).asInstanceOf[T]
-    } catch {
-      // DeserializationStream uses the EOF exception to indicate stopping 
condition.
-      case e: KryoException
-        if e.getMessage.toLowerCase(Locale.ROOT).contains("buffer underflow") 
=>
-        throw new EOFException
-    }

Review Comment:
   Sure will add it back. I think that catching and ignoring the exceptions 
here should be revisited in some other change as it seems to me like it could 
case dataloss that we just assume the exception here means EOF. 



##########
core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala:
##########
@@ -504,44 +505,31 @@ class ExternalAppendOnlyMap[K, V, C](
      * If no more pairs are left, return null.
      */
     private def readNextItem(): (K, C) = {
-      try {
-        val k = deserializeStream.readKey().asInstanceOf[K]
-        val c = deserializeStream.readValue().asInstanceOf[C]
-        val item = (k, c)
-        objectsRead += 1
-        if (objectsRead == serializerBatchSize) {
-          objectsRead = 0
-          deserializeStream = nextBatchStream()
-        }
-        item
-      } catch {
-        case e: EOFException =>
-          cleanup()
-          null
+      val next = batchIterator.next()
+      objectsRead += 1
+      if (objectsRead == serializerBatchSize) {
+        objectsRead = 0
+        batchIterator = nextBatchIterator()
       }
+      next
     }
 
     override def hasNext: Boolean = {
-      if (nextItem == null) {
-        if (deserializeStream == null) {
-          // In case of deserializeStream has not been initialized
-          deserializeStream = nextBatchStream()
-          if (deserializeStream == null) {
-            return false
-          }
+      if (batchIterator == null) {
+        // In case of batchIterator has not been initialized
+        batchIterator = nextBatchIterator()
+        if (batchIterator == null) {
+          return false
         }
-        nextItem = readNextItem()
       }
-      nextItem != null
+      batchIterator.hasNext
     }
 
     override def next(): (K, C) = {
-      if (!hasNext) {
+      if (batchIterator == null) {

Review Comment:
   In that case it will call next on the empty iterator and we should still 
throw a `NoSuchElementException`. But `!hasNext` should also have that behavior 
so can change back to that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to