This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 43d332d2b9d5 [SPARK-55228][SPARK-55230][SQL][CONNECT] Implement
Dataset.zipWithIndex in Scala API
43d332d2b9d5 is described below
commit 43d332d2b9d5939c99321b3cdbebf9e6d5d91d5a
Author: Fangchen Li <[email protected]>
AuthorDate: Thu Feb 5 16:20:27 2026 +0800
[SPARK-55228][SPARK-55230][SQL][CONNECT] Implement Dataset.zipWithIndex in
Scala API
### What changes were proposed in this pull request?
Implement Dataset.zipWithIndex in Scala API
### Why are the changes needed?
Align Dataset and RDD api
### Does this PR introduce _any_ user-facing change?
Yes, this PR added Dataset.zipWithIndex() and
Dataset.zipWithIndex(indexColName: String)
### How was this patch tested?
Unittests added
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.5
Closes #54014 from fangchenli/dataset-zip-with-index.
Authored-by: Fangchen Li <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
project/MimaExcludes.scala | 4 +-
.../main/scala/org/apache/spark/sql/Dataset.scala | 31 ++++++++
.../apache/spark/sql/PlanGenerationTestSuite.scala | 8 ++
.../spark/sql/connect/ClientE2ETestSuite.scala | 36 +++++++++
.../explain-results/zipWithIndex.explain | 4 +
.../zipWithIndex_custom_column.explain | 4 +
.../query-tests/queries/zipWithIndex.json | 81 +++++++++++++++++++++
.../query-tests/queries/zipWithIndex.proto.bin | Bin 0 -> 644 bytes
.../queries/zipWithIndex_custom_column.json | 81 +++++++++++++++++++++
.../queries/zipWithIndex_custom_column.proto.bin | Bin 0 -> 647 bytes
.../scala/org/apache/spark/sql/DatasetSuite.scala | 42 +++++++++++
11 files changed, 290 insertions(+), 1 deletion(-)
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index c4e39e070f1a..9add6542841e 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -40,7 +40,9 @@ object MimaExcludes {
// [SPARK-47086][BUILD][CORE][WEBUI] Upgrade Jetty to 12.1.4
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ui.ProxyRedirectHandler$ResponseWrapper"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.ProxyRedirectHandler#ResponseWrapper.sendRedirect"),
-
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ui.ProxyRedirectHandler#ResponseWrapper.this")
+
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ui.ProxyRedirectHandler#ResponseWrapper.this"),
+ // [SPARK-55228][SQL] Implement Dataset.zipWithIndex in Scala API
+
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.Dataset.zipWithIndex")
)
// Exclude rules for 4.1.x from 4.0.0
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala
index 6b06ce58df6b..0f1fe314c350 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2010,6 +2010,37 @@ abstract class Dataset[T] extends Serializable {
*/
def exceptAll(other: Dataset[T]): Dataset[T]
+ /**
+ * Returns a new `Dataset` by appending a column containing consecutive
0-based Long indices,
+ * similar to `RDD.zipWithIndex()`.
+ *
+ * The index column is appended as the last column of the resulting
`DataFrame`.
+ *
+ * @group untypedrel
+ * @since 4.2.0
+ */
+ def zipWithIndex(): DataFrame = zipWithIndex("index")
+
+ /**
+ * Returns a new `Dataset` by appending a column containing consecutive
0-based Long indices,
+ * similar to `RDD.zipWithIndex()`.
+ *
+ * The index column is appended as the last column of the resulting
`DataFrame`.
+ *
+ * @note
+ * If a column with `indexColName` already exists in the schema, the
resulting `DataFrame`
+ * will have duplicate column names. Selecting the duplicate column by
name will throw
+ * `AMBIGUOUS_REFERENCE`, and writing the `DataFrame` will throw
`COLUMN_ALREADY_EXISTS`.
+ *
+ * @param indexColName
+ * The name of the index column to append.
+ * @group untypedrel
+ * @since 4.2.0
+ */
+ def zipWithIndex(indexColName: String): DataFrame = {
+ select(col("*"),
Column.internalFn("distributed_sequence_id").alias(indexColName))
+ }
+
/**
* Returns a new [[Dataset]] by sampling a fraction of rows (without
replacement), using a
* user-supplied seed.
diff --git
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index a21de2dbfaa6..852e832ded85 100644
---
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -684,6 +684,14 @@ class PlanGenerationTestSuite extends ConnectFunSuite with
Logging {
simple.withMetadata("id", builder.build())
}
+ test("zipWithIndex") {
+ simple.zipWithIndex()
+ }
+
+ test("zipWithIndex custom column") {
+ simple.zipWithIndex("my_index")
+ }
+
test("drop single string") {
simple.drop("a")
}
diff --git
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala
index 5ff030b56d4c..52d87087805f 100644
---
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala
+++
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala
@@ -772,6 +772,42 @@ class ClientE2ETestSuite
assert(spark.range(10).count() === 10)
}
+ test("Dataset zipWithIndex") {
+ val df = spark.range(5).repartition(3)
+ val result = df.zipWithIndex()
+ assert(result.columns === Array("id", "index"))
+ assert(result.schema.last.dataType === LongType)
+ val indices = result.collect().map(_.getLong(1)).sorted
+ assert(indices === (0L until 5L).toArray)
+ }
+
+ test("Dataset zipWithIndex with custom column name") {
+ val result = spark.range(3).zipWithIndex("row_num")
+ assert(result.columns === Array("id", "row_num"))
+ val indices = result.collect().map(_.getLong(1)).sorted
+ assert(indices === Array(0L, 1L, 2L))
+ }
+
+ test("Dataset zipWithIndex should throw AMBIGUOUS_REFERENCE when selecting
duplicate column") {
+ val df = spark.range(3).withColumnRenamed("id", "index")
+ val result = df.zipWithIndex() // Creates df with two "index" columns
+ val ex = intercept[AnalysisException] {
+ result.select("index").collect()
+ }
+ assert(ex.getCondition == "AMBIGUOUS_REFERENCE")
+ }
+
+ test("Dataset zipWithIndex should throw COLUMN_ALREADY_EXISTS when writing
duplicate columns") {
+ val df = spark.range(3).withColumnRenamed("id", "index")
+ val result = df.zipWithIndex() // Creates df with two "index" columns
+ withTempPath { path =>
+ val ex = intercept[AnalysisException] {
+ result.write.parquet(path.getAbsolutePath)
+ }
+ assert(ex.getCondition == "COLUMN_ALREADY_EXISTS")
+ }
+ }
+
test("Dataset collect tuple") {
val session = spark
import session.implicits._
diff --git
a/sql/connect/common/src/test/resources/query-tests/explain-results/zipWithIndex.explain
b/sql/connect/common/src/test/resources/query-tests/explain-results/zipWithIndex.explain
new file mode 100644
index 000000000000..c1ee3b77d979
--- /dev/null
+++
b/sql/connect/common/src/test/resources/query-tests/explain-results/zipWithIndex.explain
@@ -0,0 +1,4 @@
+Project [id#0L, a#0, b#0, index#0L]
++- Project [id#0L, a#0, b#0, distributed_sequence_id#0L AS index#0L]
+ +- AttachDistributedSequence[distributed_sequence_id#0L, id#0L, a#0, b#0]
Index: distributed_sequence_id#0L
+ +- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git
a/sql/connect/common/src/test/resources/query-tests/explain-results/zipWithIndex_custom_column.explain
b/sql/connect/common/src/test/resources/query-tests/explain-results/zipWithIndex_custom_column.explain
new file mode 100644
index 000000000000..638f222caa67
--- /dev/null
+++
b/sql/connect/common/src/test/resources/query-tests/explain-results/zipWithIndex_custom_column.explain
@@ -0,0 +1,4 @@
+Project [id#0L, a#0, b#0, my_index#0L]
++- Project [id#0L, a#0, b#0, distributed_sequence_id#0L AS my_index#0L]
+ +- AttachDistributedSequence[distributed_sequence_id#0L, id#0L, a#0, b#0]
Index: distributed_sequence_id#0L
+ +- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex.json
b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex.json
new file mode 100644
index 000000000000..9c0278561a61
--- /dev/null
+++
b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex.json
@@ -0,0 +1,81 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedStar": {
+ "planId": "0"
+ },
+ "common": {
+ "origin": {
+ "jvmOrigin": {
+ "stackTrace": [{
+ "classLoaderName": "app",
+ "declaringClass": "org.apache.spark.sql.Dataset",
+ "methodName": "zipWithIndex",
+ "fileName": "Dataset.scala"
+ }, {
+ "classLoaderName": "app",
+ "declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite",
+ "methodName": "~~trimmed~anonfun~~",
+ "fileName": "PlanGenerationTestSuite.scala"
+ }]
+ }
+ }
+ }
+ }, {
+ "alias": {
+ "expr": {
+ "unresolvedFunction": {
+ "functionName": "distributed_sequence_id",
+ "isInternal": true
+ },
+ "common": {
+ "origin": {
+ "jvmOrigin": {
+ "stackTrace": [{
+ "classLoaderName": "app",
+ "declaringClass": "org.apache.spark.sql.Dataset",
+ "methodName": "zipWithIndex",
+ "fileName": "Dataset.scala"
+ }, {
+ "classLoaderName": "app",
+ "declaringClass":
"org.apache.spark.sql.PlanGenerationTestSuite",
+ "methodName": "~~trimmed~anonfun~~",
+ "fileName": "PlanGenerationTestSuite.scala"
+ }]
+ }
+ }
+ }
+ },
+ "name": ["index"]
+ },
+ "common": {
+ "origin": {
+ "jvmOrigin": {
+ "stackTrace": [{
+ "classLoaderName": "app",
+ "declaringClass": "org.apache.spark.sql.Dataset",
+ "methodName": "zipWithIndex",
+ "fileName": "Dataset.scala"
+ }, {
+ "classLoaderName": "app",
+ "declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite",
+ "methodName": "~~trimmed~anonfun~~",
+ "fileName": "PlanGenerationTestSuite.scala"
+ }]
+ }
+ }
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex.proto.bin
b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex.proto.bin
new file mode 100644
index 000000000000..2109d63291ee
Binary files /dev/null and
b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex.proto.bin
differ
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex_custom_column.json
b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex_custom_column.json
new file mode 100644
index 000000000000..80229004d7a8
--- /dev/null
+++
b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex_custom_column.json
@@ -0,0 +1,81 @@
+{
+ "common": {
+ "planId": "1"
+ },
+ "project": {
+ "input": {
+ "common": {
+ "planId": "0"
+ },
+ "localRelation": {
+ "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+ }
+ },
+ "expressions": [{
+ "unresolvedStar": {
+ "planId": "0"
+ },
+ "common": {
+ "origin": {
+ "jvmOrigin": {
+ "stackTrace": [{
+ "classLoaderName": "app",
+ "declaringClass": "org.apache.spark.sql.Dataset",
+ "methodName": "zipWithIndex",
+ "fileName": "Dataset.scala"
+ }, {
+ "classLoaderName": "app",
+ "declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite",
+ "methodName": "~~trimmed~anonfun~~",
+ "fileName": "PlanGenerationTestSuite.scala"
+ }]
+ }
+ }
+ }
+ }, {
+ "alias": {
+ "expr": {
+ "unresolvedFunction": {
+ "functionName": "distributed_sequence_id",
+ "isInternal": true
+ },
+ "common": {
+ "origin": {
+ "jvmOrigin": {
+ "stackTrace": [{
+ "classLoaderName": "app",
+ "declaringClass": "org.apache.spark.sql.Dataset",
+ "methodName": "zipWithIndex",
+ "fileName": "Dataset.scala"
+ }, {
+ "classLoaderName": "app",
+ "declaringClass":
"org.apache.spark.sql.PlanGenerationTestSuite",
+ "methodName": "~~trimmed~anonfun~~",
+ "fileName": "PlanGenerationTestSuite.scala"
+ }]
+ }
+ }
+ }
+ },
+ "name": ["my_index"]
+ },
+ "common": {
+ "origin": {
+ "jvmOrigin": {
+ "stackTrace": [{
+ "classLoaderName": "app",
+ "declaringClass": "org.apache.spark.sql.Dataset",
+ "methodName": "zipWithIndex",
+ "fileName": "Dataset.scala"
+ }, {
+ "classLoaderName": "app",
+ "declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite",
+ "methodName": "~~trimmed~anonfun~~",
+ "fileName": "PlanGenerationTestSuite.scala"
+ }]
+ }
+ }
+ }
+ }]
+ }
+}
\ No newline at end of file
diff --git
a/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex_custom_column.proto.bin
b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex_custom_column.proto.bin
new file mode 100644
index 000000000000..0b60290a4241
Binary files /dev/null and
b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex_custom_column.proto.bin
differ
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 2aefd011851e..ef053d638701 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -2997,6 +2997,48 @@ class DatasetSuite extends QueryTest
val metrics = observation.get
assert(metrics.isEmpty)
}
+
+ test("zipWithIndex should append consecutive 0-based indices") {
+ val ds = Seq(("a", 1), ("b", 2), ("c", 3), ("d", 4), ("e",
5)).toDS().repartition(3)
+ val result = ds.zipWithIndex()
+
+ // Index column should be the last column
+ assert(result.columns === Array("_1", "_2", "index"))
+ assert(result.schema.last.dataType === LongType)
+
+ // Indices should be consecutive 0-based
+ val indices = result.collect().map(_.getLong(2)).sorted
+ assert(indices === (0L until 5L).toArray)
+ }
+
+ test("zipWithIndex with custom column name") {
+ val ds = Seq(1, 2, 3, 4, 5).toDS()
+ val result = ds.zipWithIndex("row_num")
+
+ assert(result.columns === Array("value", "row_num"))
+ val indices = result.collect().map(_.getLong(1)).sorted
+ assert(indices === (0L until 5L).toArray)
+ }
+
+ test("zipWithIndex should throw AMBIGUOUS_REFERENCE when selecting duplicate
column") {
+ val ds = Seq(("a", 1), ("b", 2)).toDF("_1", "index")
+ val result = ds.zipWithIndex() // Creates df with two "index" columns
+ val ex = intercept[AnalysisException] {
+ result.select("index").collect()
+ }
+ assert(ex.getCondition == "AMBIGUOUS_REFERENCE")
+ }
+
+ test("zipWithIndex should throw COLUMN_ALREADY_EXISTS when writing duplicate
columns") {
+ val ds = Seq(("a", 1), ("b", 2)).toDF("_1", "index")
+ val result = ds.zipWithIndex() // Creates df with two "index" columns
+ withTempPath { path =>
+ val ex = intercept[AnalysisException] {
+ result.write.parquet(path.getAbsolutePath)
+ }
+ assert(ex.getCondition == "COLUMN_ALREADY_EXISTS")
+ }
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]