anishshri-db commented on code in PR #47641: URL: https://github.com/apache/spark/pull/47641#discussion_r1706212242
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ########## @@ -198,7 +204,29 @@ case class TransformWithStateExec( getOutputRow(obj) } ImplicitGroupingKeyTracker.removeImplicitKey() - mappedIterator + + // Wrapper to ensure that the implicit key is set when the methods on the iterator + // are called. Inside of processNewData, we use a GroupedIterator, so handleInputRows + // is only called once per key. As such, we don't strictly need to set/unset the + // implicit key on every call to next(); we just need to set it on the first call + // to hasNext and unset it after the last call to next. However, such an optimization + // relies on knowing this implementation detail of processNewData, so we set/unset the + // implicit key on every call to a mappedIterator's public methods to be safe. + new Iterator[InternalRow] { + override def hasNext: Boolean = { + ImplicitGroupingKeyTracker.setImplicitKey(keyObj) Review Comment: Is it possible to set this and remove this once for the iterator lifetime ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org