viirya commented on code in PR #50080:
URL: https://github.com/apache/spark/pull/50080#discussion_r1971054365
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala:
##########
@@ -219,17 +218,13 @@ case class MapPartitionsInRWithArrowExec(
child: SparkPlan) extends UnaryExecNode {
override def producedAttributes: AttributeSet = AttributeSet(output)
- private val batchSize = conf.arrowMaxRecordsPerBatch
-
override def outputPartitioning: Partitioning = child.outputPartitioning
override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitionsInternal { inputIter =>
val outputTypes = schema.map(_.dataType)
- // DO NOT use iter.grouped(). See BatchIterator.
- val batchIter =
- if (batchSize > 0) new BatchIterator(inputIter, batchSize) else
Iterator(inputIter)
+ val batchIter = Iterator(inputIter)
Review Comment:
Like `ArrowPythonWithNamedArgumentRunner` which is now batching rows
internally with `BatchedPythonArrowInput`, but I don't see such thing on
ArrowRRunner too. Is it intentional?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]