Kimahriman commented on code in PR #52303:
URL: https://github.com/apache/spark/pull/52303#discussion_r2343870388


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInBatchExec.scala:
##########
@@ -82,17 +83,31 @@ trait FlatMapGroupsInBatchExec extends SparkPlan with 
UnaryExecNode with PythonS
 
       val data = groupedData(iter, dedupAttributes)
 
-      val runner = new ArrowPythonRunner(
-        chainedFunc,
-        pythonEvalType,
-        Array(argOffsets),
-        groupedSchema(dedupAttributes),
-        sessionLocalTimeZone,
-        largeVarTypes,
-        pythonRunnerConf,
-        pythonMetrics,
-        jobArtifactUUID,
-        conf.pythonUDFProfiler)
+      val runner = if (arrowBatchSlicingEnabled) {

Review Comment:
   What's the use case for making this configurable? Why would you not want 
this enabled? It seems like making it configurable adds a lot of unnecessary 
complexity. 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowInput.scala:
##########
@@ -194,3 +180,118 @@ private[python] trait BatchedPythonArrowInput extends 
BasicPythonArrowInput {
     }
   }
 }
+
+object BatchedPythonArrowInput {

Review Comment:
   Can `BaseStreamingArrowWriter` be updated/used instead of implementing a new 
version of something similar? It's already used for applyInPandasWithState and 
transformWithState to provide basically the same behavior, which is why I 
reused it in my PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to