Re: [PSA] TaskContext.partitionId != the actual logical partition index

2016-10-20 Thread Cody Koeninger
Yep, I had submitted a PR that included it way back in the original
direct stream for kafka, but it got nixed in favor of
TaskContext.partitionId ;)  The concern then was about too many
xWithBlah apis on rdd.

If we do want to deprecate taskcontext.partitionId and add
foreachPartitionWithIndex, I think that makes sense, I can start a
ticket.

On Thu, Oct 20, 2016 at 1:16 PM, Reynold Xin  wrote:
> Seems like a good new API to add?
>
>
> On Thu, Oct 20, 2016 at 11:14 AM, Cody Koeninger  wrote:
>>
>> Access to the partition ID is necessary for basically every single one
>> of my jobs, and there isn't a foreachPartiionWithIndex equivalent.
>> You can kind of work around it with empty foreach after the map, but
>> it's really awkward to explain to people.
>>
>> On Thu, Oct 20, 2016 at 12:52 PM, Reynold Xin  wrote:
>> > FYI - Xiangrui submitted an amazing pull request to fix a long standing
>> > issue with a lot of the nondeterministic expressions (rand, randn,
>> > monotonically_increasing_id): https://github.com/apache/spark/pull/15567
>> >
>> > Prior to this PR, we were using TaskContext.partitionId as the partition
>> > index in initializing expressions. However, that is actually not a good
>> > index to use in most cases, because it is the physical task's partition
>> > id
>> > and does not always reflect the partition index at the time the RDD is
>> > created (or in the Spark SQL physical plan). This makes a big difference
>> > once there is a union or coalesce operation.
>> >
>> > The "index" given by mapPartitionsWithIndex, on the other hand, does not
>> > have this problem because it actually reflects the logical partition
>> > index
>> > at the time the RDD is created.
>> >
>> > When is it safe to use TaskContext.partitionId? It is safe at the very
>> > end
>> > of a query plan (the root node), because there partitionId is guaranteed
>> > based on the current implementation to be the same as the physical task
>> > partition id.
>> >
>> >
>> > For example, prior to Xiangrui's PR, the following query would return 2
>> > rows, whereas the correct behavior should be 1 entry:
>> >
>> >
>> > spark.range(1).selectExpr("rand(1)").union(spark.range(1).selectExpr("rand(1)")).distinct.show()
>> >
>> > The reason it'd return 2 rows is because rand was using
>> > TaskContext.partitionId as the per-partition seed, and as a result the
>> > two
>> > sides of the union are using different seeds.
>> >
>> >
>> > I'm starting to think we should deprecate the API and ban the use of it
>> > within the project to be safe ...
>> >
>> >
>
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [PSA] TaskContext.partitionId != the actual logical partition index

2016-10-20 Thread Reynold Xin
Seems like a good new API to add?


On Thu, Oct 20, 2016 at 11:14 AM, Cody Koeninger  wrote:

> Access to the partition ID is necessary for basically every single one
> of my jobs, and there isn't a foreachPartiionWithIndex equivalent.
> You can kind of work around it with empty foreach after the map, but
> it's really awkward to explain to people.
>
> On Thu, Oct 20, 2016 at 12:52 PM, Reynold Xin  wrote:
> > FYI - Xiangrui submitted an amazing pull request to fix a long standing
> > issue with a lot of the nondeterministic expressions (rand, randn,
> > monotonically_increasing_id): https://github.com/apache/spark/pull/15567
> >
> > Prior to this PR, we were using TaskContext.partitionId as the partition
> > index in initializing expressions. However, that is actually not a good
> > index to use in most cases, because it is the physical task's partition
> id
> > and does not always reflect the partition index at the time the RDD is
> > created (or in the Spark SQL physical plan). This makes a big difference
> > once there is a union or coalesce operation.
> >
> > The "index" given by mapPartitionsWithIndex, on the other hand, does not
> > have this problem because it actually reflects the logical partition
> index
> > at the time the RDD is created.
> >
> > When is it safe to use TaskContext.partitionId? It is safe at the very
> end
> > of a query plan (the root node), because there partitionId is guaranteed
> > based on the current implementation to be the same as the physical task
> > partition id.
> >
> >
> > For example, prior to Xiangrui's PR, the following query would return 2
> > rows, whereas the correct behavior should be 1 entry:
> >
> > spark.range(1).selectExpr("rand(1)").union(spark.range(1)
> .selectExpr("rand(1)")).distinct.show()
> >
> > The reason it'd return 2 rows is because rand was using
> > TaskContext.partitionId as the per-partition seed, and as a result the
> two
> > sides of the union are using different seeds.
> >
> >
> > I'm starting to think we should deprecate the API and ban the use of it
> > within the project to be safe ...
> >
> >
>


Re: [PSA] TaskContext.partitionId != the actual logical partition index

2016-10-20 Thread Cody Koeninger
Access to the partition ID is necessary for basically every single one
of my jobs, and there isn't a foreachPartiionWithIndex equivalent.
You can kind of work around it with empty foreach after the map, but
it's really awkward to explain to people.

On Thu, Oct 20, 2016 at 12:52 PM, Reynold Xin  wrote:
> FYI - Xiangrui submitted an amazing pull request to fix a long standing
> issue with a lot of the nondeterministic expressions (rand, randn,
> monotonically_increasing_id): https://github.com/apache/spark/pull/15567
>
> Prior to this PR, we were using TaskContext.partitionId as the partition
> index in initializing expressions. However, that is actually not a good
> index to use in most cases, because it is the physical task's partition id
> and does not always reflect the partition index at the time the RDD is
> created (or in the Spark SQL physical plan). This makes a big difference
> once there is a union or coalesce operation.
>
> The "index" given by mapPartitionsWithIndex, on the other hand, does not
> have this problem because it actually reflects the logical partition index
> at the time the RDD is created.
>
> When is it safe to use TaskContext.partitionId? It is safe at the very end
> of a query plan (the root node), because there partitionId is guaranteed
> based on the current implementation to be the same as the physical task
> partition id.
>
>
> For example, prior to Xiangrui's PR, the following query would return 2
> rows, whereas the correct behavior should be 1 entry:
>
> spark.range(1).selectExpr("rand(1)").union(spark.range(1).selectExpr("rand(1)")).distinct.show()
>
> The reason it'd return 2 rows is because rand was using
> TaskContext.partitionId as the per-partition seed, and as a result the two
> sides of the union are using different seeds.
>
>
> I'm starting to think we should deprecate the API and ban the use of it
> within the project to be safe ...
>
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org