Kimahriman commented on code in PR #52303:
URL: https://github.com/apache/spark/pull/52303#discussion_r2343870388
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInBatchExec.scala:
##########
@@ -82,17 +83,31 @@ trait FlatMapGroupsInBatchExec extends SparkPlan with
UnaryExecNode with PythonS
val data = groupedData(iter, dedupAttributes)
- val runner = new ArrowPythonRunner(
- chainedFunc,
- pythonEvalType,
- Array(argOffsets),
- groupedSchema(dedupAttributes),
- sessionLocalTimeZone,
- largeVarTypes,
- pythonRunnerConf,
- pythonMetrics,
- jobArtifactUUID,
- conf.pythonUDFProfiler)
+ val runner = if (arrowBatchSlicingEnabled) {
Review Comment:
What's the use case for making this configurable? Why would you not want
this enabled? It seems like making it configurable adds a lot of unnecessary
complexity.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowInput.scala:
##########
@@ -194,3 +180,118 @@ private[python] trait BatchedPythonArrowInput extends
BasicPythonArrowInput {
}
}
}
+
+object BatchedPythonArrowInput {
Review Comment:
Can `BaseStreamingArrowWriter` be updated/used instead of implementing a new
version of something similar? It's already used for applyInPandasWithState and
transformWithState to provide basically the same behavior, which is why I
reused it in my PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]