Github user JoshRosen commented on the pull request:
https://github.com/apache/incubator-spark/pull/635#issuecomment-35967399
@mengxr For your examples (training multiple models in parallel and
broadcast join), why is `allCollect` better than directly using broadcast
variables? Why do you want to access your model/table through an RDD with a
single array value? It seems like you'd need to use something like
`zipPartitions()` to pair up the model with your data, so why not just directly
reference the broadcast variable in a `mapPartitions()` on your varying dataset?
In the past, I've thought about implementing a
`SparkContext.broadcast[T](rdd: RDD[T]): Broadcast[T]` for creating broadcast
variables from RDDs. This can be implemented through peer-to-peer broadcasting
of the RDD fragments rather than broadcasting the entire collected RDD from
the driver; this avoids bottlenecking on the driver's send bandwidth.
I tried to prototype efficient broadcasting of RDDs, but ran into some
difficulties. My implementation used its own Broadcast subclass that fetched
RDD partitions from remote block managers when trying to access the broadcast
variable's value. The problem is that we need to run some job on the
broadcasted RDD to generate the partitions that the Broadcast subclass will
fetch, and we need to ensure that those partitions are computed and stored
before we attempt to fetch them. Thus, any transformation that references a
BroadcastRDD variable must declare a dependency on the stage that produces
those blocks. Usually, RDDs declare their dependencies when they're
constructed; with BroadcastRDDs, we'd need to actually look inside the UDF to
find any references in order to build the proper lineage.
On a related note, it would be nice to implement an `allReduce` function
that's equivalent to `reduce().broadcast()` but performs similar optimizations
to avoid coordinating through the driver.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---