Seems like a good new API to add?
On Thu, Oct 20, 2016 at 11:14 AM, Cody Koeninger <c...@koeninger.org> wrote: > Access to the partition ID is necessary for basically every single one > of my jobs, and there isn't a foreachPartiionWithIndex equivalent. > You can kind of work around it with empty foreach after the map, but > it's really awkward to explain to people. > > On Thu, Oct 20, 2016 at 12:52 PM, Reynold Xin <r...@databricks.com> wrote: > > FYI - Xiangrui submitted an amazing pull request to fix a long standing > > issue with a lot of the nondeterministic expressions (rand, randn, > > monotonically_increasing_id): https://github.com/apache/spark/pull/15567 > > > > Prior to this PR, we were using TaskContext.partitionId as the partition > > index in initializing expressions. However, that is actually not a good > > index to use in most cases, because it is the physical task's partition > id > > and does not always reflect the partition index at the time the RDD is > > created (or in the Spark SQL physical plan). This makes a big difference > > once there is a union or coalesce operation. > > > > The "index" given by mapPartitionsWithIndex, on the other hand, does not > > have this problem because it actually reflects the logical partition > index > > at the time the RDD is created. > > > > When is it safe to use TaskContext.partitionId? It is safe at the very > end > > of a query plan (the root node), because there partitionId is guaranteed > > based on the current implementation to be the same as the physical task > > partition id. > > > > > > For example, prior to Xiangrui's PR, the following query would return 2 > > rows, whereas the correct behavior should be 1 entry: > > > > spark.range(1).selectExpr("rand(1)").union(spark.range(1) > .selectExpr("rand(1)")).distinct.show() > > > > The reason it'd return 2 rows is because rand was using > > TaskContext.partitionId as the per-partition seed, and as a result the > two > > sides of the union are using different seeds. > > > > > > I'm starting to think we should deprecate the API and ban the use of it > > within the project to be safe ... > > > > >