Yep, I had submitted a PR that included it way back in the original
direct stream for kafka, but it got nixed in favor of
TaskContext.partitionId ;)  The concern then was about too many
xWithBlah apis on rdd.

If we do want to deprecate taskcontext.partitionId and add
foreachPartitionWithIndex, I think that makes sense, I can start a
ticket.

On Thu, Oct 20, 2016 at 1:16 PM, Reynold Xin <r...@databricks.com> wrote:
> Seems like a good new API to add?
>
>
> On Thu, Oct 20, 2016 at 11:14 AM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Access to the partition ID is necessary for basically every single one
>> of my jobs, and there isn't a foreachPartiionWithIndex equivalent.
>> You can kind of work around it with empty foreach after the map, but
>> it's really awkward to explain to people.
>>
>> On Thu, Oct 20, 2016 at 12:52 PM, Reynold Xin <r...@databricks.com> wrote:
>> > FYI - Xiangrui submitted an amazing pull request to fix a long standing
>> > issue with a lot of the nondeterministic expressions (rand, randn,
>> > monotonically_increasing_id): https://github.com/apache/spark/pull/15567
>> >
>> > Prior to this PR, we were using TaskContext.partitionId as the partition
>> > index in initializing expressions. However, that is actually not a good
>> > index to use in most cases, because it is the physical task's partition
>> > id
>> > and does not always reflect the partition index at the time the RDD is
>> > created (or in the Spark SQL physical plan). This makes a big difference
>> > once there is a union or coalesce operation.
>> >
>> > The "index" given by mapPartitionsWithIndex, on the other hand, does not
>> > have this problem because it actually reflects the logical partition
>> > index
>> > at the time the RDD is created.
>> >
>> > When is it safe to use TaskContext.partitionId? It is safe at the very
>> > end
>> > of a query plan (the root node), because there partitionId is guaranteed
>> > based on the current implementation to be the same as the physical task
>> > partition id.
>> >
>> >
>> > For example, prior to Xiangrui's PR, the following query would return 2
>> > rows, whereas the correct behavior should be 1 entry:
>> >
>> >
>> > spark.range(1).selectExpr("rand(1)").union(spark.range(1).selectExpr("rand(1)")).distinct.show()
>> >
>> > The reason it'd return 2 rows is because rand was using
>> > TaskContext.partitionId as the per-partition seed, and as a result the
>> > two
>> > sides of the union are using different seeds.
>> >
>> >
>> > I'm starting to think we should deprecate the API and ban the use of it
>> > within the project to be safe ...
>> >
>> >
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to