eejbyfeldt commented on code in PR #38428: URL: https://github.com/apache/spark/pull/38428#discussion_r1010340881
########## core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala: ########## @@ -301,15 +300,18 @@ class KryoDeserializationStream( private[this] var kryo: Kryo = serInstance.borrowKryo() + final private[this] def hasNext: Boolean = { Review Comment: Yes, will fix. ########## core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala: ########## @@ -324,6 +326,36 @@ class KryoDeserializationStream( } } } + + final override def asIterator: Iterator[Any] = new NextIterator[Any] { + override protected def getNext() = { + if (KryoDeserializationStream.this.hasNext) { + readValue[Any]() + } else { + finished = true + null + } + } + + override protected def close(): Unit = { + KryoDeserializationStream.this.close() + } + } + + final override def asKeyValueIterator: Iterator[(Any, Any)] = new NextIterator[(Any, Any)] { + override protected def getNext() = { + if (KryoDeserializationStream.this.hasNext) { + (readKey[Any](), readValue[Any]()) Review Comment: You mean that if only a key exist we just ignore it like the current implementation would? ########## core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala: ########## @@ -301,15 +300,18 @@ class KryoDeserializationStream( private[this] var kryo: Kryo = serInstance.borrowKryo() + final private[this] def hasNext: Boolean = { + if (input == null) { + return false + } + + val eof = input.eof() + if (eof) close() + !eof + } + override def readObject[T: ClassTag](): T = { - try { kryo.readClassAndObject(input).asInstanceOf[T] - } catch { - // DeserializationStream uses the EOF exception to indicate stopping condition. - case e: KryoException - if e.getMessage.toLowerCase(Locale.ROOT).contains("buffer underflow") => - throw new EOFException - } Review Comment: Sure will add it back. I think that catching and ignoring the exceptions here should be revisited in some other change as it seems to me like it could case dataloss that we just assume the exception here means EOF. ########## core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala: ########## @@ -504,44 +505,31 @@ class ExternalAppendOnlyMap[K, V, C]( * If no more pairs are left, return null. */ private def readNextItem(): (K, C) = { - try { - val k = deserializeStream.readKey().asInstanceOf[K] - val c = deserializeStream.readValue().asInstanceOf[C] - val item = (k, c) - objectsRead += 1 - if (objectsRead == serializerBatchSize) { - objectsRead = 0 - deserializeStream = nextBatchStream() - } - item - } catch { - case e: EOFException => - cleanup() - null + val next = batchIterator.next() + objectsRead += 1 + if (objectsRead == serializerBatchSize) { + objectsRead = 0 + batchIterator = nextBatchIterator() } + next } override def hasNext: Boolean = { - if (nextItem == null) { - if (deserializeStream == null) { - // In case of deserializeStream has not been initialized - deserializeStream = nextBatchStream() - if (deserializeStream == null) { - return false - } + if (batchIterator == null) { + // In case of batchIterator has not been initialized + batchIterator = nextBatchIterator() + if (batchIterator == null) { + return false } - nextItem = readNextItem() } - nextItem != null + batchIterator.hasNext } override def next(): (K, C) = { - if (!hasNext) { + if (batchIterator == null) { Review Comment: In that case it will call next on the empty iterator and we should still throw a `NoSuchElementException`. But `!hasNext` should also have that behavior so can change back to that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org