This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-connect-swift.git


The following commit(s) were added to refs/heads/main by this push:
     new 5ded181  [SPARK-52066] Support `unpivot/melt/transpose` in `DataFrame`
5ded181 is described below

commit 5ded181f8fe0af86b68f83df3b13d7e0d30966ea
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sat May 10 10:06:23 2025 -0700

    [SPARK-52066] Support `unpivot/melt/transpose` in `DataFrame`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to add `unpivot`, `melt`, `transpose` API of `DataFrame`.
    
    ### Why are the changes needed?
    
    For feature parity.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #123 from dongjoon-hyun/SPARK-52066.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 Sources/SparkConnect/DataFrame.swift          | 104 ++++++++++++++++++++++++++
 Sources/SparkConnect/SparkConnectClient.swift |  47 ++++++++++++
 Tests/SparkConnectTests/DataFrameTests.swift  |  45 +++++++++++
 3 files changed, 196 insertions(+)

diff --git a/Sources/SparkConnect/DataFrame.swift 
b/Sources/SparkConnect/DataFrame.swift
index 5771588..ff30f25 100644
--- a/Sources/SparkConnect/DataFrame.swift
+++ b/Sources/SparkConnect/DataFrame.swift
@@ -109,6 +109,10 @@ import Synchronization
 /// - ``dropDuplicatesWithinWatermark(_:)``
 /// - ``distinct()``
 /// - ``withColumnRenamed(_:_:)``
+/// - ``unpivot(_:_:_:)``
+/// - ``unpivot(_:_:_:_:)``
+/// - ``melt(_:_:_:)``
+/// - ``melt(_:_:_:_:)``
 ///
 /// ### Join Operations
 /// - ``join(_:)``
@@ -1202,6 +1206,106 @@ public actor DataFrame: Sendable {
     return dropDuplicates()
   }
 
+  /// Transposes a DataFrame, switching rows to columns. This function 
transforms the DataFrame
+  /// such that the values in the first column become the new columns of the 
DataFrame.
+  /// - Returns: A transposed ``DataFrame``.
+  public func transpose() -> DataFrame {
+    return buildTranspose([])
+  }
+
+  /// Unpivot a DataFrame from wide format to long format, optionally leaving 
identifier columns
+  /// set. This is the reverse to `groupBy(...).pivot(...).agg(...)`, except 
for the aggregation,
+  /// which cannot be reversed. This is an alias for `unpivot`.
+  /// - Parameters:
+  ///   - ids: ID column names
+  ///   - values: Value column names to unpivot
+  ///   - variableColumnName: Name of the variable column
+  ///   - valueColumnName: Name of the value column
+  /// - Returns: A ``DataFrame``.
+  public func melt(
+    _ ids: [String],
+    _ values: [String],
+    _ variableColumnName: String,
+    _ valueColumnName: String
+  ) -> DataFrame {
+    return unpivot(ids, values, variableColumnName, valueColumnName)
+  }
+
+  /// Unpivot a DataFrame from wide format to long format, optionally leaving 
identifier columns
+  /// set. This is the reverse to `groupBy(...).pivot(...).agg(...)`, except 
for the aggregation,
+  /// which cannot be reversed. This is an alias for `unpivot`.
+  /// - Parameters:
+  ///   - ids: ID column names
+  ///   - variableColumnName: Name of the variable column
+  ///   - valueColumnName: Name of the value column
+  /// - Returns: A ``DataFrame``.
+  public func melt(
+    _ ids: [String],
+    _ variableColumnName: String,
+    _ valueColumnName: String
+  ) -> DataFrame {
+    return unpivot(ids, variableColumnName, valueColumnName)
+  }
+
+  /// Unpivot a DataFrame from wide format to long format, optionally leaving 
identifier columns
+  /// set. This is the reverse to `groupBy(...).pivot(...).agg(...)`, except 
for the aggregation,
+  /// which cannot be reversed.
+  /// - Parameters:
+  ///   - ids: ID column names
+  ///   - values: Value column names to unpivot
+  ///   - variableColumnName: Name of the variable column
+  ///   - valueColumnName: Name of the value column
+  /// - Returns: A ``DataFrame``.
+  public func unpivot(
+    _ ids: [String],
+    _ values: [String],
+    _ variableColumnName: String,
+    _ valueColumnName: String
+  ) -> DataFrame {
+    return buildUnpivot(ids, values, variableColumnName, valueColumnName)
+  }
+
+  /// Unpivot a DataFrame from wide format to long format, optionally leaving 
identifier columns
+  /// set. This is the reverse to `groupBy(...).pivot(...).agg(...)`, except 
for the aggregation,
+  /// which cannot be reversed.
+  /// - Parameters:
+  ///   - ids: ID column names
+  ///   - variableColumnName: Name of the variable column
+  ///   - valueColumnName: Name of the value column
+  /// - Returns: A ``DataFrame``.
+  public func unpivot(
+    _ ids: [String],
+    _ variableColumnName: String,
+    _ valueColumnName: String
+  ) -> DataFrame {
+    return buildUnpivot(ids, nil, variableColumnName, valueColumnName)
+  }
+
+  func buildUnpivot(
+    _ ids: [String],
+    _ values: [String]?,
+    _ variableColumnName: String,
+    _ valueColumnName: String,
+  ) -> DataFrame {
+    let plan = SparkConnectClient.getUnpivot(self.plan.root, ids, values, 
variableColumnName, valueColumnName)
+    return DataFrame(spark: self.spark, plan: plan)
+  }
+
+  /// Transposes a ``DataFrame`` such that the values in the specified index 
column become the new
+  /// columns of the ``DataFrame``.
+  /// - Parameter indexColumn: The single column that will be treated as the 
index for the transpose operation.
+  /// This column will be used to pivot the data, transforming the DataFrame 
such that the values of
+  /// the indexColumn become the new columns in the transposed DataFrame.
+  /// - Returns: A transposed ``DataFrame``.
+  public func transpose(_ indexColumn: String) -> DataFrame {
+    return buildTranspose([indexColumn])
+  }
+
+  func buildTranspose(_ indexColumn: [String]) -> DataFrame {
+    let plan = SparkConnectClient.getTranspose(self.plan.root, indexColumn)
+    return DataFrame(spark: self.spark, plan: plan)
+  }
+
   /// Groups the DataFrame using the specified columns.
   ///
   /// This method is used to perform aggregations on groups of data.
diff --git a/Sources/SparkConnect/SparkConnectClient.swift 
b/Sources/SparkConnect/SparkConnectClient.swift
index 309b5a0..d74528b 100644
--- a/Sources/SparkConnect/SparkConnectClient.swift
+++ b/Sources/SparkConnect/SparkConnectClient.swift
@@ -920,6 +920,53 @@ public actor SparkConnectClient {
     return plan
   }
 
+  static func getUnpivot(
+    _ child: Relation,
+    _ ids: [String],
+    _ values: [String]?,
+    _ variableColumnName: String,
+    _ valueColumnName: String,
+  ) -> Plan {
+    var unpivot = Spark_Connect_Unpivot()
+    unpivot.input = child
+    unpivot.ids = ids.map {
+      var expr = Spark_Connect_Expression()
+      expr.expressionString = $0.toExpressionString
+      return expr
+    }
+    if let values {
+      var unpivotValues = Spark_Connect_Unpivot.Values()
+      unpivotValues.values = values.map {
+        var expr = Spark_Connect_Expression()
+        expr.expressionString = $0.toExpressionString
+        return expr
+      }
+      unpivot.values = unpivotValues
+    }
+    unpivot.variableColumnName = variableColumnName
+    unpivot.valueColumnName = valueColumnName
+    var relation = Relation()
+    relation.unpivot = unpivot
+    var plan = Plan()
+    plan.opType = .root(relation)
+    return plan
+  }
+
+  static func getTranspose(_ child: Relation, _ indexColumn: [String]) -> Plan 
{
+    var transpose = Spark_Connect_Transpose()
+    transpose.input = child
+    transpose.indexColumns = indexColumn.map {
+      var expr = Spark_Connect_Expression()
+      expr.expressionString = $0.toExpressionString
+      return expr
+    }
+    var relation = Relation()
+    relation.transpose = transpose
+    var plan = Plan()
+    plan.opType = .root(relation)
+    return plan
+  }
+
   func createTempView(
     _ child: Relation, _ viewName: String, replace: Bool, isGlobal: Bool
   ) async throws {
diff --git a/Tests/SparkConnectTests/DataFrameTests.swift 
b/Tests/SparkConnectTests/DataFrameTests.swift
index bea34ed..7b6c4c2 100644
--- a/Tests/SparkConnectTests/DataFrameTests.swift
+++ b/Tests/SparkConnectTests/DataFrameTests.swift
@@ -788,6 +788,51 @@ struct DataFrameTests {
     #expect(try await spark.sql("SELECT 1 a, 2 b, 3 c").toJSON().collect() == 
expected)
     await spark.stop()
   }
+
+  @Test
+  func unpivot() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    let df = try await spark.sql(
+      """
+      SELECT * FROM
+      VALUES (1, 11, 12L),
+             (2, 21, 22L)
+      T(id, int, long)
+      """)
+    let expected = [
+      Row(1, "int", 11),
+      Row(1, "long", 12),
+      Row(2, "int", 21),
+      Row(2, "long", 22),
+    ]
+    #expect(try await df.unpivot(["id"], ["int", "long"], "variable", 
"value").collect() == expected)
+    #expect(try await df.melt(["id"], ["int", "long"], "variable", 
"value").collect() == expected)
+    await spark.stop()
+  }
+
+  @Test
+  func transpose() async throws {
+    let spark = try await SparkSession.builder.getOrCreate()
+    if await spark.version.starts(with: "4.") {
+      #expect(try await spark.range(1).transpose().columns == ["key", "0"])
+      #expect(try await spark.range(1).transpose().count() == 0)
+      
+      let df = try await spark.sql(
+      """
+      SELECT * FROM
+      VALUES ('A', 1, 2),
+             ('B', 3, 4)
+      T(id, val1, val2)
+      """)
+      let expected = [
+        Row("val1", 1, 3),
+        Row("val2", 2, 4),
+      ]
+      #expect(try await df.transpose().collect() == expected)
+      #expect(try await df.transpose("id").collect() == expected)
+    }
+    await spark.stop()
+  }
 #endif
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to