Copilot commented on code in PR #12403:
URL: https://github.com/apache/gluten/pull/12403#discussion_r3498495378


##########
backends-velox/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleReader.scala:
##########
@@ -91,26 +91,19 @@ class ColumnarShuffleReader[K, C](
       fetchContinuousBlocksInBatch
     ).toCompletionIterator
 
-    val recordIter = dep match {
-      case columnarDep: ColumnarShuffleDependency[K, _, C] =>
-        // If the dependency is a ColumnarShuffleDependency, we use the 
columnar serializer.
-        columnarDep.serializer
-          .newInstance()
-          .asInstanceOf[ColumnarBatchSerializerInstance]
-          .deserializeStreams(wrappedStreams)
-          .asKeyValueIterator
-      case _ =>
-        val serializerInstance = dep.serializer.newInstance()
-        // Create a key/value iterator for each stream
-        wrappedStreams.flatMap {
-          case (blockId, wrappedStream) =>
-            // Note: the asKeyValueIterator below wraps a key/value iterator 
inside of a
-            // NextIterator. The NextIterator makes sure that close() is 
called on the
-            // underlying InputStream when all records have been read.
-            
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
-        }
+    val columnarDep = dep match {
+      case d: ColumnarShuffleDependency[_, _, _] =>

Review Comment:
   Dependency validation happens after defining wrappedStreams; with the new 
hard requirement on ColumnarShuffleDependency, it’s better to avoid 
constructing the ShuffleBlockFetcherIterator at all when the dependency is 
invalid. Making wrappedStreams lazy ensures we fail fast without setting up 
block fetching in the error case.



##########
backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/perf/GlutenDeltaOptimizedWriterExec.scala:
##########
@@ -338,25 +338,18 @@ private class GlutenOptimizedWriterShuffleReader(
     ).toCompletionIterator
 
     // Create a key/value iterator for each stream
-    val recordIter = dep match {
-      case columnarDep: ColumnarShuffleDependency[Int, ColumnarBatch, 
ColumnarBatch] =>
-        // If the dependency is a ColumnarShuffleDependency, we use the 
columnar serializer.
-        columnarDep.serializer
-          .newInstance()
-          .asInstanceOf[ColumnarBatchSerializerInstance]
-          .deserializeStreams(wrappedStreams)
-          .asKeyValueIterator
-      case _ =>
-        val serializerInstance = dep.serializer.newInstance()
-        // Create a key/value iterator for each stream
-        wrappedStreams.flatMap {
-          case (blockId, wrappedStream) =>
-            // Note: the asKeyValueIterator below wraps a key/value iterator 
inside of a
-            // NextIterator. The NextIterator makes sure that close() is 
called on the
-            // underlying InputStream when all records have been read.
-            
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
-        }
+    val columnarDep = dep match {

Review Comment:
   This reader now unconditionally requires ColumnarShuffleDependency, but 
wrappedStreams is created before the dependency check. Making wrappedStreams 
lazy avoids setting up ShuffleBlockFetcherIterator in the invalid-dependency 
case (the exception will be thrown before wrappedStreams is evaluated).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to