[ https://issues.apache.org/jira/browse/SPARK-23074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16460137#comment-16460137 ]
Ruslan Dautkhanov edited comment on SPARK-23074 at 5/1/18 8:56 PM: ------------------------------------------------------------------- That's . When we have a "record" that spans multiple text lines, and it happens that some lines are in one partitions, and rest of lines are in another partition.. what would monotonically_increasing_id() return? -It wouldn't be consequential, right?- I just fond answer to this question in the [code |https://github.com/apache/spark/blob/7c1654e2159662e7e663ba141719d755002f770a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/MonotonicallyIncreasingID.scala#L27]. See [https://stackoverflow.com/a/48454000/470583] - people are creating quite expensive workaround {code} def zipWithIndex(df: DataFrame, offset: Long = 1, indexName: String = "index") = { val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id", monotonically_increasing_id()) val partitionOffsets = dfWithPartitionId .groupBy("partition_id") .agg(count(lit(1)) as "cnt", first("inc_id") as "inc_id") .orderBy("partition_id") .select(sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") - col("inc_id") + lit(offset) as "cnt" ) .collect() .map(_.getLong(0)) .toArray dfWithPartitionId .withColumn("partition_offset", udf((partitionId: Int) => partitionOffsets(partitionId), LongType)(col("partition_id"))) .withColumn(indexName, col("partition_offset") + col("inc_id")) .drop("partition_id", "partition_offset", "inc_id") } {code} Do you see an easier way to do this? Thanks! was (Author: tagar): That's . When we have a "record" that spans multiple text lines, and it happens that some lines are in one partitions, and rest of lines are in another partition.. what would monotonically_increasing_id() return? It wouldn't be consequential, right? See [https://stackoverflow.com/a/48454000/470583] - people are creating quite expensive workaround {code:scala} def zipWithIndex(df: DataFrame, offset: Long = 1, indexName: String = "index") = { val dfWithPartitionId = df.withColumn("partition_id", spark_partition_id()).withColumn("inc_id", monotonically_increasing_id()) val partitionOffsets = dfWithPartitionId .groupBy("partition_id") .agg(count(lit(1)) as "cnt", first("inc_id") as "inc_id") .orderBy("partition_id") .select(sum("cnt").over(Window.orderBy("partition_id")) - col("cnt") - col("inc_id") + lit(offset) as "cnt" ) .collect() .map(_.getLong(0)) .toArray dfWithPartitionId .withColumn("partition_offset", udf((partitionId: Int) => partitionOffsets(partitionId), LongType)(col("partition_id"))) .withColumn(indexName, col("partition_offset") + col("inc_id")) .drop("partition_id", "partition_offset", "inc_id") } {code} Do you see an easier way to do this? Thanks! > Dataframe-ified zipwithindex > ---------------------------- > > Key: SPARK-23074 > URL: https://issues.apache.org/jira/browse/SPARK-23074 > Project: Spark > Issue Type: New Feature > Components: Spark Core > Affects Versions: 2.3.0 > Reporter: Ruslan Dautkhanov > Priority: Minor > Labels: dataframe, rdd > > Would be great to have a daraframe-friendly equivalent of rdd.zipWithIndex(): > {code:java} > import org.apache.spark.sql.DataFrame > import org.apache.spark.sql.types.{LongType, StructField, StructType} > import org.apache.spark.sql.Row > def dfZipWithIndex( > df: DataFrame, > offset: Int = 1, > colName: String = "id", > inFront: Boolean = true > ) : DataFrame = { > df.sqlContext.createDataFrame( > df.rdd.zipWithIndex.map(ln => > Row.fromSeq( > (if (inFront) Seq(ln._2 + offset) else Seq()) > ++ ln._1.toSeq ++ > (if (inFront) Seq() else Seq(ln._2 + offset)) > ) > ), > StructType( > (if (inFront) Array(StructField(colName,LongType,false)) else > Array[StructField]()) > ++ df.schema.fields ++ > (if (inFront) Array[StructField]() else > Array(StructField(colName,LongType,false))) > ) > ) > } > {code} > credits: > [https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex] -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org