FYI - Xiangrui submitted an amazing pull request to fix a long standing
issue with a lot of the nondeterministic expressions (rand, randn,
monotonically_increasing_id): https://github.com/apache/spark/pull/15567

Prior to this PR, we were using TaskContext.partitionId as the partition
index in initializing expressions. However, that is actually not a good
index to use in most cases, because it is the physical task's partition id
and does not always reflect the partition index at the time the RDD is
created (or in the Spark SQL physical plan). This makes a big difference
once there is a union or coalesce operation.

The "index" given by mapPartitionsWithIndex, on the other hand, does not
have this problem because it actually reflects the logical partition index
at the time the RDD is created.

When is it safe to use TaskContext.partitionId? It is safe at the very end
of a query plan (the root node), because there partitionId is guaranteed
based on the current implementation to be the same as the physical task
partition id.


For example, prior to Xiangrui's PR, the following query would return 2
rows, whereas the correct behavior should be 1 entry:

spark.range(1).selectExpr("rand(1)").union(spark.range(1)
.selectExpr("rand(1)")).distinct.show()

The reason it'd return 2 rows is because rand was using
TaskContext.partitionId as the per-partition seed, and as a result the two
sides of the union are using different seeds.


I'm starting to think we should deprecate the API and ban the use of it
within the project to be safe ...

Reply via email to