Seems like a good new API to add?

On Thu, Oct 20, 2016 at 11:14 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Access to the partition ID is necessary for basically every single one
> of my jobs, and there isn't a foreachPartiionWithIndex equivalent.
> You can kind of work around it with empty foreach after the map, but
> it's really awkward to explain to people.
>
> On Thu, Oct 20, 2016 at 12:52 PM, Reynold Xin <r...@databricks.com> wrote:
> > FYI - Xiangrui submitted an amazing pull request to fix a long standing
> > issue with a lot of the nondeterministic expressions (rand, randn,
> > monotonically_increasing_id): https://github.com/apache/spark/pull/15567
> >
> > Prior to this PR, we were using TaskContext.partitionId as the partition
> > index in initializing expressions. However, that is actually not a good
> > index to use in most cases, because it is the physical task's partition
> id
> > and does not always reflect the partition index at the time the RDD is
> > created (or in the Spark SQL physical plan). This makes a big difference
> > once there is a union or coalesce operation.
> >
> > The "index" given by mapPartitionsWithIndex, on the other hand, does not
> > have this problem because it actually reflects the logical partition
> index
> > at the time the RDD is created.
> >
> > When is it safe to use TaskContext.partitionId? It is safe at the very
> end
> > of a query plan (the root node), because there partitionId is guaranteed
> > based on the current implementation to be the same as the physical task
> > partition id.
> >
> >
> > For example, prior to Xiangrui's PR, the following query would return 2
> > rows, whereas the correct behavior should be 1 entry:
> >
> > spark.range(1).selectExpr("rand(1)").union(spark.range(1)
> .selectExpr("rand(1)")).distinct.show()
> >
> > The reason it'd return 2 rows is because rand was using
> > TaskContext.partitionId as the per-partition seed, and as a result the
> two
> > sides of the union are using different seeds.
> >
> >
> > I'm starting to think we should deprecate the API and ban the use of it
> > within the project to be safe ...
> >
> >
>

Reply via email to