Re: [PSA] TaskContext.partitionId != the actual logical partition index
Yep, I had submitted a PR that included it way back in the original direct stream for kafka, but it got nixed in favor of TaskContext.partitionId ;) The concern then was about too many xWithBlah apis on rdd. If we do want to deprecate taskcontext.partitionId and add foreachPartitionWithIndex, I think that makes sense, I can start a ticket. On Thu, Oct 20, 2016 at 1:16 PM, Reynold Xinwrote: > Seems like a good new API to add? > > > On Thu, Oct 20, 2016 at 11:14 AM, Cody Koeninger wrote: >> >> Access to the partition ID is necessary for basically every single one >> of my jobs, and there isn't a foreachPartiionWithIndex equivalent. >> You can kind of work around it with empty foreach after the map, but >> it's really awkward to explain to people. >> >> On Thu, Oct 20, 2016 at 12:52 PM, Reynold Xin wrote: >> > FYI - Xiangrui submitted an amazing pull request to fix a long standing >> > issue with a lot of the nondeterministic expressions (rand, randn, >> > monotonically_increasing_id): https://github.com/apache/spark/pull/15567 >> > >> > Prior to this PR, we were using TaskContext.partitionId as the partition >> > index in initializing expressions. However, that is actually not a good >> > index to use in most cases, because it is the physical task's partition >> > id >> > and does not always reflect the partition index at the time the RDD is >> > created (or in the Spark SQL physical plan). This makes a big difference >> > once there is a union or coalesce operation. >> > >> > The "index" given by mapPartitionsWithIndex, on the other hand, does not >> > have this problem because it actually reflects the logical partition >> > index >> > at the time the RDD is created. >> > >> > When is it safe to use TaskContext.partitionId? It is safe at the very >> > end >> > of a query plan (the root node), because there partitionId is guaranteed >> > based on the current implementation to be the same as the physical task >> > partition id. >> > >> > >> > For example, prior to Xiangrui's PR, the following query would return 2 >> > rows, whereas the correct behavior should be 1 entry: >> > >> > >> > spark.range(1).selectExpr("rand(1)").union(spark.range(1).selectExpr("rand(1)")).distinct.show() >> > >> > The reason it'd return 2 rows is because rand was using >> > TaskContext.partitionId as the per-partition seed, and as a result the >> > two >> > sides of the union are using different seeds. >> > >> > >> > I'm starting to think we should deprecate the API and ban the use of it >> > within the project to be safe ... >> > >> > > > - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
Re: [PSA] TaskContext.partitionId != the actual logical partition index
Seems like a good new API to add? On Thu, Oct 20, 2016 at 11:14 AM, Cody Koeningerwrote: > Access to the partition ID is necessary for basically every single one > of my jobs, and there isn't a foreachPartiionWithIndex equivalent. > You can kind of work around it with empty foreach after the map, but > it's really awkward to explain to people. > > On Thu, Oct 20, 2016 at 12:52 PM, Reynold Xin wrote: > > FYI - Xiangrui submitted an amazing pull request to fix a long standing > > issue with a lot of the nondeterministic expressions (rand, randn, > > monotonically_increasing_id): https://github.com/apache/spark/pull/15567 > > > > Prior to this PR, we were using TaskContext.partitionId as the partition > > index in initializing expressions. However, that is actually not a good > > index to use in most cases, because it is the physical task's partition > id > > and does not always reflect the partition index at the time the RDD is > > created (or in the Spark SQL physical plan). This makes a big difference > > once there is a union or coalesce operation. > > > > The "index" given by mapPartitionsWithIndex, on the other hand, does not > > have this problem because it actually reflects the logical partition > index > > at the time the RDD is created. > > > > When is it safe to use TaskContext.partitionId? It is safe at the very > end > > of a query plan (the root node), because there partitionId is guaranteed > > based on the current implementation to be the same as the physical task > > partition id. > > > > > > For example, prior to Xiangrui's PR, the following query would return 2 > > rows, whereas the correct behavior should be 1 entry: > > > > spark.range(1).selectExpr("rand(1)").union(spark.range(1) > .selectExpr("rand(1)")).distinct.show() > > > > The reason it'd return 2 rows is because rand was using > > TaskContext.partitionId as the per-partition seed, and as a result the > two > > sides of the union are using different seeds. > > > > > > I'm starting to think we should deprecate the API and ban the use of it > > within the project to be safe ... > > > > >
Re: [PSA] TaskContext.partitionId != the actual logical partition index
Access to the partition ID is necessary for basically every single one of my jobs, and there isn't a foreachPartiionWithIndex equivalent. You can kind of work around it with empty foreach after the map, but it's really awkward to explain to people. On Thu, Oct 20, 2016 at 12:52 PM, Reynold Xinwrote: > FYI - Xiangrui submitted an amazing pull request to fix a long standing > issue with a lot of the nondeterministic expressions (rand, randn, > monotonically_increasing_id): https://github.com/apache/spark/pull/15567 > > Prior to this PR, we were using TaskContext.partitionId as the partition > index in initializing expressions. However, that is actually not a good > index to use in most cases, because it is the physical task's partition id > and does not always reflect the partition index at the time the RDD is > created (or in the Spark SQL physical plan). This makes a big difference > once there is a union or coalesce operation. > > The "index" given by mapPartitionsWithIndex, on the other hand, does not > have this problem because it actually reflects the logical partition index > at the time the RDD is created. > > When is it safe to use TaskContext.partitionId? It is safe at the very end > of a query plan (the root node), because there partitionId is guaranteed > based on the current implementation to be the same as the physical task > partition id. > > > For example, prior to Xiangrui's PR, the following query would return 2 > rows, whereas the correct behavior should be 1 entry: > > spark.range(1).selectExpr("rand(1)").union(spark.range(1).selectExpr("rand(1)")).distinct.show() > > The reason it'd return 2 rows is because rand was using > TaskContext.partitionId as the per-partition seed, and as a result the two > sides of the union are using different seeds. > > > I'm starting to think we should deprecate the API and ban the use of it > within the project to be safe ... > > - To unsubscribe e-mail: dev-unsubscr...@spark.apache.org