[ 
https://issues.apache.org/jira/browse/SPARK-23074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460137#comment-16460137
 ] 

Ruslan Dautkhanov commented on SPARK-23074:
-------------------------------------------

That's . When we have a "record" that spans multiple text lines, and it happens 
that some lines are in one partitions, and rest of lines are in another 
partition.. what would monotonically_increasing_id() return? It wouldn't be 
consequential, right? 

See [https://stackoverflow.com/a/48454000/470583] - people are creating quite 
expensive workaround 

{code:scala}
def zipWithIndex(df: DataFrame, offset: Long = 1, indexName: String = "index") 
= {
    val dfWithPartitionId = df.withColumn("partition_id", 
spark_partition_id()).withColumn("inc_id", monotonically_increasing_id())

    val partitionOffsets = dfWithPartitionId
        .groupBy("partition_id")
        .agg(count(lit(1)) as "cnt", first("inc_id") as "inc_id")
        .orderBy("partition_id")
        .select(sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") - 
col("inc_id") + lit(offset) as "cnt" )
        .collect()
        .map(_.getLong(0))
        .toArray

     dfWithPartitionId
        .withColumn("partition_offset", udf((partitionId: Int) => 
partitionOffsets(partitionId), LongType)(col("partition_id")))
        .withColumn(indexName, col("partition_offset") + col("inc_id"))
        .drop("partition_id", "partition_offset", "inc_id")
}
{code}

Do you see an easier way to do this? Thanks!
 

> Dataframe-ified zipwithindex
> ----------------------------
>
>                 Key: SPARK-23074
>                 URL: https://issues.apache.org/jira/browse/SPARK-23074
>             Project: Spark
>          Issue Type: New Feature
>          Components: Spark Core
>    Affects Versions: 2.3.0
>            Reporter: Ruslan Dautkhanov
>            Priority: Minor
>              Labels: dataframe, rdd
>
> Would be great to have a daraframe-friendly equivalent of rdd.zipWithIndex():
> {code:java}
> import org.apache.spark.sql.DataFrame
> import org.apache.spark.sql.types.{LongType, StructField, StructType}
> import org.apache.spark.sql.Row
> def dfZipWithIndex(
>   df: DataFrame,
>   offset: Int = 1,
>   colName: String = "id",
>   inFront: Boolean = true
> ) : DataFrame = {
>   df.sqlContext.createDataFrame(
>     df.rdd.zipWithIndex.map(ln =>
>       Row.fromSeq(
>         (if (inFront) Seq(ln._2 + offset) else Seq())
>           ++ ln._1.toSeq ++
>         (if (inFront) Seq() else Seq(ln._2 + offset))
>       )
>     ),
>     StructType(
>       (if (inFront) Array(StructField(colName,LongType,false)) else 
> Array[StructField]()) 
>         ++ df.schema.fields ++ 
>       (if (inFront) Array[StructField]() else 
> Array(StructField(colName,LongType,false)))
>     )
>   ) 
> }
> {code}
> credits: 
> [https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to