This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git
The following commit(s) were added to refs/heads/main by this push:
new 5ded181 [SPARK-52066] Support `unpivot/melt/transpose` in `DataFrame`
5ded181 is described below
commit 5ded181f8fe0af86b68f83df3b13d7e0d30966ea
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sat May 10 10:06:23 2025 -0700
[SPARK-52066] Support `unpivot/melt/transpose` in `DataFrame`
### What changes were proposed in this pull request?
This PR aims to add `unpivot`, `melt`, `transpose` API of `DataFrame`.
### Why are the changes needed?
For feature parity.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #123 from dongjoon-hyun/SPARK-52066.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
Sources/SparkConnect/DataFrame.swift | 104 ++++++++++++++++++++++++++
Sources/SparkConnect/SparkConnectClient.swift | 47 ++++++++++++
Tests/SparkConnectTests/DataFrameTests.swift | 45 +++++++++++
3 files changed, 196 insertions(+)
diff --git a/Sources/SparkConnect/DataFrame.swift
b/Sources/SparkConnect/DataFrame.swift
index 5771588..ff30f25 100644
--- a/Sources/SparkConnect/DataFrame.swift
+++ b/Sources/SparkConnect/DataFrame.swift
@@ -109,6 +109,10 @@ import Synchronization
/// - ``dropDuplicatesWithinWatermark(_:)``
/// - ``distinct()``
/// - ``withColumnRenamed(_:_:)``
+/// - ``unpivot(_:_:_:)``
+/// - ``unpivot(_:_:_:_:)``
+/// - ``melt(_:_:_:)``
+/// - ``melt(_:_:_:_:)``
///
/// ### Join Operations
/// - ``join(_:)``
@@ -1202,6 +1206,106 @@ public actor DataFrame: Sendable {
return dropDuplicates()
}
+ /// Transposes a DataFrame, switching rows to columns. This function
transforms the DataFrame
+ /// such that the values in the first column become the new columns of the
DataFrame.
+ /// - Returns: A transposed ``DataFrame``.
+ public func transpose() -> DataFrame {
+ return buildTranspose([])
+ }
+
+ /// Unpivot a DataFrame from wide format to long format, optionally leaving
identifier columns
+ /// set. This is the reverse to `groupBy(...).pivot(...).agg(...)`, except
for the aggregation,
+ /// which cannot be reversed. This is an alias for `unpivot`.
+ /// - Parameters:
+ /// - ids: ID column names
+ /// - values: Value column names to unpivot
+ /// - variableColumnName: Name of the variable column
+ /// - valueColumnName: Name of the value column
+ /// - Returns: A ``DataFrame``.
+ public func melt(
+ _ ids: [String],
+ _ values: [String],
+ _ variableColumnName: String,
+ _ valueColumnName: String
+ ) -> DataFrame {
+ return unpivot(ids, values, variableColumnName, valueColumnName)
+ }
+
+ /// Unpivot a DataFrame from wide format to long format, optionally leaving
identifier columns
+ /// set. This is the reverse to `groupBy(...).pivot(...).agg(...)`, except
for the aggregation,
+ /// which cannot be reversed. This is an alias for `unpivot`.
+ /// - Parameters:
+ /// - ids: ID column names
+ /// - variableColumnName: Name of the variable column
+ /// - valueColumnName: Name of the value column
+ /// - Returns: A ``DataFrame``.
+ public func melt(
+ _ ids: [String],
+ _ variableColumnName: String,
+ _ valueColumnName: String
+ ) -> DataFrame {
+ return unpivot(ids, variableColumnName, valueColumnName)
+ }
+
+ /// Unpivot a DataFrame from wide format to long format, optionally leaving
identifier columns
+ /// set. This is the reverse to `groupBy(...).pivot(...).agg(...)`, except
for the aggregation,
+ /// which cannot be reversed.
+ /// - Parameters:
+ /// - ids: ID column names
+ /// - values: Value column names to unpivot
+ /// - variableColumnName: Name of the variable column
+ /// - valueColumnName: Name of the value column
+ /// - Returns: A ``DataFrame``.
+ public func unpivot(
+ _ ids: [String],
+ _ values: [String],
+ _ variableColumnName: String,
+ _ valueColumnName: String
+ ) -> DataFrame {
+ return buildUnpivot(ids, values, variableColumnName, valueColumnName)
+ }
+
+ /// Unpivot a DataFrame from wide format to long format, optionally leaving
identifier columns
+ /// set. This is the reverse to `groupBy(...).pivot(...).agg(...)`, except
for the aggregation,
+ /// which cannot be reversed.
+ /// - Parameters:
+ /// - ids: ID column names
+ /// - variableColumnName: Name of the variable column
+ /// - valueColumnName: Name of the value column
+ /// - Returns: A ``DataFrame``.
+ public func unpivot(
+ _ ids: [String],
+ _ variableColumnName: String,
+ _ valueColumnName: String
+ ) -> DataFrame {
+ return buildUnpivot(ids, nil, variableColumnName, valueColumnName)
+ }
+
+ func buildUnpivot(
+ _ ids: [String],
+ _ values: [String]?,
+ _ variableColumnName: String,
+ _ valueColumnName: String,
+ ) -> DataFrame {
+ let plan = SparkConnectClient.getUnpivot(self.plan.root, ids, values,
variableColumnName, valueColumnName)
+ return DataFrame(spark: self.spark, plan: plan)
+ }
+
+ /// Transposes a ``DataFrame`` such that the values in the specified index
column become the new
+ /// columns of the ``DataFrame``.
+ /// - Parameter indexColumn: The single column that will be treated as the
index for the transpose operation.
+ /// This column will be used to pivot the data, transforming the DataFrame
such that the values of
+ /// the indexColumn become the new columns in the transposed DataFrame.
+ /// - Returns: A transposed ``DataFrame``.
+ public func transpose(_ indexColumn: String) -> DataFrame {
+ return buildTranspose([indexColumn])
+ }
+
+ func buildTranspose(_ indexColumn: [String]) -> DataFrame {
+ let plan = SparkConnectClient.getTranspose(self.plan.root, indexColumn)
+ return DataFrame(spark: self.spark, plan: plan)
+ }
+
/// Groups the DataFrame using the specified columns.
///
/// This method is used to perform aggregations on groups of data.
diff --git a/Sources/SparkConnect/SparkConnectClient.swift
b/Sources/SparkConnect/SparkConnectClient.swift
index 309b5a0..d74528b 100644
--- a/Sources/SparkConnect/SparkConnectClient.swift
+++ b/Sources/SparkConnect/SparkConnectClient.swift
@@ -920,6 +920,53 @@ public actor SparkConnectClient {
return plan
}
+ static func getUnpivot(
+ _ child: Relation,
+ _ ids: [String],
+ _ values: [String]?,
+ _ variableColumnName: String,
+ _ valueColumnName: String,
+ ) -> Plan {
+ var unpivot = Spark_Connect_Unpivot()
+ unpivot.input = child
+ unpivot.ids = ids.map {
+ var expr = Spark_Connect_Expression()
+ expr.expressionString = $0.toExpressionString
+ return expr
+ }
+ if let values {
+ var unpivotValues = Spark_Connect_Unpivot.Values()
+ unpivotValues.values = values.map {
+ var expr = Spark_Connect_Expression()
+ expr.expressionString = $0.toExpressionString
+ return expr
+ }
+ unpivot.values = unpivotValues
+ }
+ unpivot.variableColumnName = variableColumnName
+ unpivot.valueColumnName = valueColumnName
+ var relation = Relation()
+ relation.unpivot = unpivot
+ var plan = Plan()
+ plan.opType = .root(relation)
+ return plan
+ }
+
+ static func getTranspose(_ child: Relation, _ indexColumn: [String]) -> Plan
{
+ var transpose = Spark_Connect_Transpose()
+ transpose.input = child
+ transpose.indexColumns = indexColumn.map {
+ var expr = Spark_Connect_Expression()
+ expr.expressionString = $0.toExpressionString
+ return expr
+ }
+ var relation = Relation()
+ relation.transpose = transpose
+ var plan = Plan()
+ plan.opType = .root(relation)
+ return plan
+ }
+
func createTempView(
_ child: Relation, _ viewName: String, replace: Bool, isGlobal: Bool
) async throws {
diff --git a/Tests/SparkConnectTests/DataFrameTests.swift
b/Tests/SparkConnectTests/DataFrameTests.swift
index bea34ed..7b6c4c2 100644
--- a/Tests/SparkConnectTests/DataFrameTests.swift
+++ b/Tests/SparkConnectTests/DataFrameTests.swift
@@ -788,6 +788,51 @@ struct DataFrameTests {
#expect(try await spark.sql("SELECT 1 a, 2 b, 3 c").toJSON().collect() ==
expected)
await spark.stop()
}
+
+ @Test
+ func unpivot() async throws {
+ let spark = try await SparkSession.builder.getOrCreate()
+ let df = try await spark.sql(
+ """
+ SELECT * FROM
+ VALUES (1, 11, 12L),
+ (2, 21, 22L)
+ T(id, int, long)
+ """)
+ let expected = [
+ Row(1, "int", 11),
+ Row(1, "long", 12),
+ Row(2, "int", 21),
+ Row(2, "long", 22),
+ ]
+ #expect(try await df.unpivot(["id"], ["int", "long"], "variable",
"value").collect() == expected)
+ #expect(try await df.melt(["id"], ["int", "long"], "variable",
"value").collect() == expected)
+ await spark.stop()
+ }
+
+ @Test
+ func transpose() async throws {
+ let spark = try await SparkSession.builder.getOrCreate()
+ if await spark.version.starts(with: "4.") {
+ #expect(try await spark.range(1).transpose().columns == ["key", "0"])
+ #expect(try await spark.range(1).transpose().count() == 0)
+
+ let df = try await spark.sql(
+ """
+ SELECT * FROM
+ VALUES ('A', 1, 2),
+ ('B', 3, 4)
+ T(id, val1, val2)
+ """)
+ let expected = [
+ Row("val1", 1, 3),
+ Row("val2", 2, 4),
+ ]
+ #expect(try await df.transpose().collect() == expected)
+ #expect(try await df.transpose("id").collect() == expected)
+ }
+ await spark.stop()
+ }
#endif
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]