This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark-connect-go.git


The following commit(s) were added to refs/heads/master by this push:
     new b35574e  [SPARK-44368] Support Repartition and RepartitionByRange in 
Spark Connect Go Client
b35574e is described below

commit b35574eb9cd5a86ef9a5e602f312cbdfaf1c55f8
Author: hiboyang <14280154+hiboy...@users.noreply.github.com>
AuthorDate: Thu Aug 3 08:12:57 2023 +0800

    [SPARK-44368] Support Repartition and RepartitionByRange in Spark Connect 
Go Client
    
    ### What changes were proposed in this pull request?
    
    Support Repartition and RepartitionByRange in Spark Connect Go Client
    
    ### Why are the changes needed?
    
    This is to support more functionalities in Spark Connect Go Client.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, user will be able to repartition data frame in Spark Connect Go 
Client, e.g.
    ```
    dataFrame.Repartition(0, []string{"word"})
    
    dataFrame.RepartitionByRange(0, []sql.RangePartitionColumn{
                    {
                            Name:       "word",
                            Descending: true,
                    },
            })
    ```
    
    ### How was this patch tested?
    
    Tested by running example code
    
    Closes #13 from hiboyang/bo-dev-04.
    
    Authored-by: hiboyang <14280154+hiboy...@users.noreply.github.com>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 client/sql/dataframe.go                         | 82 +++++++++++++++++++++++++
 cmd/spark-connect-example-spark-session/main.go | 57 +++++++++++++++++
 2 files changed, 139 insertions(+)

diff --git a/client/sql/dataframe.go b/client/sql/dataframe.go
index f2a0747..773f714 100644
--- a/client/sql/dataframe.go
+++ b/client/sql/dataframe.go
@@ -39,6 +39,15 @@ type DataFrame interface {
        Write() DataFrameWriter
        // CreateTempView creates or replaces a temporary view.
        CreateTempView(viewName string, replace bool, global bool) error
+       // Repartition re-partitions a data frame.
+       Repartition(numPartitions int, columns []string) (DataFrame, error)
+       // RepartitionByRange re-partitions a data frame by range partition.
+       RepartitionByRange(numPartitions int, columns []RangePartitionColumn) 
(DataFrame, error)
+}
+
+type RangePartitionColumn struct {
+       Name       string
+       Descending bool
 }
 
 // dataFrameImpl is an implementation of DataFrame interface.
@@ -183,6 +192,54 @@ func (df *dataFrameImpl) CreateTempView(viewName string, 
replace bool, global bo
        return consumeExecutePlanClient(responseClient)
 }
 
+func (df *dataFrameImpl) Repartition(numPartitions int, columns []string) 
(DataFrame, error) {
+       var partitionExpressions []*proto.Expression
+       if columns != nil {
+               partitionExpressions = make([]*proto.Expression, 0, 
len(columns))
+               for _, c := range columns {
+                       expr := &proto.Expression{
+                               ExprType: 
&proto.Expression_UnresolvedAttribute_{
+                                       UnresolvedAttribute: 
&proto.Expression_UnresolvedAttribute{
+                                               UnparsedIdentifier: c,
+                                       },
+                               },
+                       }
+                       partitionExpressions = append(partitionExpressions, 
expr)
+               }
+       }
+       return df.repartitionByExpressions(numPartitions, partitionExpressions)
+}
+
+func (df *dataFrameImpl) RepartitionByRange(numPartitions int, columns 
[]RangePartitionColumn) (DataFrame, error) {
+       var partitionExpressions []*proto.Expression
+       if columns != nil {
+               partitionExpressions = make([]*proto.Expression, 0, 
len(columns))
+               for _, c := range columns {
+                       columnExpr := &proto.Expression{
+                               ExprType: 
&proto.Expression_UnresolvedAttribute_{
+                                       UnresolvedAttribute: 
&proto.Expression_UnresolvedAttribute{
+                                               UnparsedIdentifier: c.Name,
+                                       },
+                               },
+                       }
+                       direction := 
proto.Expression_SortOrder_SORT_DIRECTION_ASCENDING
+                       if c.Descending {
+                               direction = 
proto.Expression_SortOrder_SORT_DIRECTION_DESCENDING
+                       }
+                       sortExpr := &proto.Expression{
+                               ExprType: &proto.Expression_SortOrder_{
+                                       SortOrder: &proto.Expression_SortOrder{
+                                               Child:     columnExpr,
+                                               Direction: direction,
+                                       },
+                               },
+                       }
+                       partitionExpressions = append(partitionExpressions, 
sortExpr)
+               }
+       }
+       return df.repartitionByExpressions(numPartitions, partitionExpressions)
+}
+
 func (df *dataFrameImpl) createPlan() *proto.Plan {
        return &proto.Plan{
                OpType: &proto.Plan_Root{
@@ -196,6 +253,31 @@ func (df *dataFrameImpl) createPlan() *proto.Plan {
        }
 }
 
+func (df *dataFrameImpl) repartitionByExpressions(numPartitions int, 
partitionExpressions []*proto.Expression) (DataFrame, error) {
+       var numPartitionsPointerValue *int32
+       if numPartitions != 0 {
+               int32Value := int32(numPartitions)
+               numPartitionsPointerValue = &int32Value
+       }
+       df.relation.GetRepartitionByExpression()
+       newRelation := &proto.Relation{
+               Common: &proto.RelationCommon{
+                       PlanId: newPlanId(),
+               },
+               RelType: &proto.Relation_RepartitionByExpression{
+                       RepartitionByExpression: &proto.RepartitionByExpression{
+                               Input:          df.relation,
+                               NumPartitions:  numPartitionsPointerValue,
+                               PartitionExprs: partitionExpressions,
+                       },
+               },
+       }
+       return &dataFrameImpl{
+               sparkSession: df.sparkSession,
+               relation:     newRelation,
+       }, nil
+}
+
 func showArrowBatch(arrowBatch *proto.ExecutePlanResponse_ArrowBatch) error {
        return showArrowBatchData(arrowBatch.Data)
 }
diff --git a/cmd/spark-connect-example-spark-session/main.go 
b/cmd/spark-connect-example-spark-session/main.go
index c35bbeb..cd21878 100644
--- a/cmd/spark-connect-example-spark-session/main.go
+++ b/cmd/spark-connect-example-spark-session/main.go
@@ -102,4 +102,61 @@ func main() {
 
        log.Printf("DataFrame from sql: select count, word from view1 order by 
count")
        df.Show(100, false)
+
+       log.Printf("Repartition with one partition")
+       df, err = df.Repartition(1, nil)
+       if err != nil {
+               log.Fatalf("Failed: %s", err.Error())
+       }
+
+       err = df.Write().Mode("overwrite").
+               Format("parquet").
+               
Save("file:///tmp/spark-connect-write-example-output-one-partition.parquet")
+       if err != nil {
+               log.Fatalf("Failed: %s", err.Error())
+       }
+
+       log.Printf("Repartition with two partitions")
+       df, err = df.Repartition(2, nil)
+       if err != nil {
+               log.Fatalf("Failed: %s", err.Error())
+       }
+
+       err = df.Write().Mode("overwrite").
+               Format("parquet").
+               
Save("file:///tmp/spark-connect-write-example-output-two-partition.parquet")
+       if err != nil {
+               log.Fatalf("Failed: %s", err.Error())
+       }
+
+       log.Printf("Repartition with columns")
+       df, err = df.Repartition(0, []string{"word", "count"})
+       if err != nil {
+               log.Fatalf("Failed: %s", err.Error())
+       }
+
+       err = df.Write().Mode("overwrite").
+               Format("parquet").
+               
Save("file:///tmp/spark-connect-write-example-output-repartition-with-column.parquet")
+       if err != nil {
+               log.Fatalf("Failed: %s", err.Error())
+       }
+
+       log.Printf("Repartition by range with columns")
+       df, err = df.RepartitionByRange(0, []sql.RangePartitionColumn{
+               {
+                       Name:       "word",
+                       Descending: true,
+               },
+       })
+       if err != nil {
+               log.Fatalf("Failed: %s", err.Error())
+       }
+
+       err = df.Write().Mode("overwrite").
+               Format("parquet").
+               
Save("file:///tmp/spark-connect-write-example-output-repartition-by-range-with-column.parquet")
+       if err != nil {
+               log.Fatalf("Failed: %s", err.Error())
+       }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to