anishshri-db commented on code in PR #47641:
URL: https://github.com/apache/spark/pull/47641#discussion_r1706212242


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -198,7 +204,29 @@ case class TransformWithStateExec(
       getOutputRow(obj)
     }
     ImplicitGroupingKeyTracker.removeImplicitKey()
-    mappedIterator
+
+    // Wrapper to ensure that the implicit key is set when the methods on the 
iterator
+    // are called. Inside of processNewData, we use a GroupedIterator, so 
handleInputRows
+    // is only called once per key. As such, we don't strictly need to 
set/unset the
+    // implicit key on every call to next(); we just need to set it on the 
first call
+    // to hasNext and unset it after the last call to next. However, such an 
optimization
+    // relies on knowing this implementation detail of processNewData, so we 
set/unset the
+    // implicit key on every call to a mappedIterator's public methods to be 
safe.
+    new Iterator[InternalRow] {
+      override def hasNext: Boolean = {
+        ImplicitGroupingKeyTracker.setImplicitKey(keyObj)

Review Comment:
   Is it possible to set this and remove this once for the iterator lifetime ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to