Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/2378#discussion_r17631575 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala --- @@ -775,17 +775,38 @@ private[spark] object PythonRDD extends Logging { }.toJavaRDD() } + private class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] { + private val pickle = new Pickler() + private var batch = 1 + private val buffer = new mutable.ArrayBuffer[Any] + + override def hasNext(): Boolean = iter.hasNext + + override def next(): Array[Byte] = { + while (iter.hasNext && buffer.length < batch) { + buffer += iter.next() + } + val bytes = pickle.dumps(buffer.toArray) + val size = bytes.length + // let 1M < size < 10M + if (size < 1024 * 100) { + batch = (1024 * 100) / size // fast grow --- End diff -- Good question. Without this fast path, `batch` may need to grow 15 times to become stable, it's good and safer. I will remove this fast path.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org