This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark-connect-go.git
The following commit(s) were added to refs/heads/master by this push: new b35574e [SPARK-44368] Support Repartition and RepartitionByRange in Spark Connect Go Client b35574e is described below commit b35574eb9cd5a86ef9a5e602f312cbdfaf1c55f8 Author: hiboyang <14280154+hiboy...@users.noreply.github.com> AuthorDate: Thu Aug 3 08:12:57 2023 +0800 [SPARK-44368] Support Repartition and RepartitionByRange in Spark Connect Go Client ### What changes were proposed in this pull request? Support Repartition and RepartitionByRange in Spark Connect Go Client ### Why are the changes needed? This is to support more functionalities in Spark Connect Go Client. ### Does this PR introduce _any_ user-facing change? Yes, user will be able to repartition data frame in Spark Connect Go Client, e.g. ``` dataFrame.Repartition(0, []string{"word"}) dataFrame.RepartitionByRange(0, []sql.RangePartitionColumn{ { Name: "word", Descending: true, }, }) ``` ### How was this patch tested? Tested by running example code Closes #13 from hiboyang/bo-dev-04. Authored-by: hiboyang <14280154+hiboy...@users.noreply.github.com> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- client/sql/dataframe.go | 82 +++++++++++++++++++++++++ cmd/spark-connect-example-spark-session/main.go | 57 +++++++++++++++++ 2 files changed, 139 insertions(+) diff --git a/client/sql/dataframe.go b/client/sql/dataframe.go index f2a0747..773f714 100644 --- a/client/sql/dataframe.go +++ b/client/sql/dataframe.go @@ -39,6 +39,15 @@ type DataFrame interface { Write() DataFrameWriter // CreateTempView creates or replaces a temporary view. CreateTempView(viewName string, replace bool, global bool) error + // Repartition re-partitions a data frame. + Repartition(numPartitions int, columns []string) (DataFrame, error) + // RepartitionByRange re-partitions a data frame by range partition. + RepartitionByRange(numPartitions int, columns []RangePartitionColumn) (DataFrame, error) +} + +type RangePartitionColumn struct { + Name string + Descending bool } // dataFrameImpl is an implementation of DataFrame interface. @@ -183,6 +192,54 @@ func (df *dataFrameImpl) CreateTempView(viewName string, replace bool, global bo return consumeExecutePlanClient(responseClient) } +func (df *dataFrameImpl) Repartition(numPartitions int, columns []string) (DataFrame, error) { + var partitionExpressions []*proto.Expression + if columns != nil { + partitionExpressions = make([]*proto.Expression, 0, len(columns)) + for _, c := range columns { + expr := &proto.Expression{ + ExprType: &proto.Expression_UnresolvedAttribute_{ + UnresolvedAttribute: &proto.Expression_UnresolvedAttribute{ + UnparsedIdentifier: c, + }, + }, + } + partitionExpressions = append(partitionExpressions, expr) + } + } + return df.repartitionByExpressions(numPartitions, partitionExpressions) +} + +func (df *dataFrameImpl) RepartitionByRange(numPartitions int, columns []RangePartitionColumn) (DataFrame, error) { + var partitionExpressions []*proto.Expression + if columns != nil { + partitionExpressions = make([]*proto.Expression, 0, len(columns)) + for _, c := range columns { + columnExpr := &proto.Expression{ + ExprType: &proto.Expression_UnresolvedAttribute_{ + UnresolvedAttribute: &proto.Expression_UnresolvedAttribute{ + UnparsedIdentifier: c.Name, + }, + }, + } + direction := proto.Expression_SortOrder_SORT_DIRECTION_ASCENDING + if c.Descending { + direction = proto.Expression_SortOrder_SORT_DIRECTION_DESCENDING + } + sortExpr := &proto.Expression{ + ExprType: &proto.Expression_SortOrder_{ + SortOrder: &proto.Expression_SortOrder{ + Child: columnExpr, + Direction: direction, + }, + }, + } + partitionExpressions = append(partitionExpressions, sortExpr) + } + } + return df.repartitionByExpressions(numPartitions, partitionExpressions) +} + func (df *dataFrameImpl) createPlan() *proto.Plan { return &proto.Plan{ OpType: &proto.Plan_Root{ @@ -196,6 +253,31 @@ func (df *dataFrameImpl) createPlan() *proto.Plan { } } +func (df *dataFrameImpl) repartitionByExpressions(numPartitions int, partitionExpressions []*proto.Expression) (DataFrame, error) { + var numPartitionsPointerValue *int32 + if numPartitions != 0 { + int32Value := int32(numPartitions) + numPartitionsPointerValue = &int32Value + } + df.relation.GetRepartitionByExpression() + newRelation := &proto.Relation{ + Common: &proto.RelationCommon{ + PlanId: newPlanId(), + }, + RelType: &proto.Relation_RepartitionByExpression{ + RepartitionByExpression: &proto.RepartitionByExpression{ + Input: df.relation, + NumPartitions: numPartitionsPointerValue, + PartitionExprs: partitionExpressions, + }, + }, + } + return &dataFrameImpl{ + sparkSession: df.sparkSession, + relation: newRelation, + }, nil +} + func showArrowBatch(arrowBatch *proto.ExecutePlanResponse_ArrowBatch) error { return showArrowBatchData(arrowBatch.Data) } diff --git a/cmd/spark-connect-example-spark-session/main.go b/cmd/spark-connect-example-spark-session/main.go index c35bbeb..cd21878 100644 --- a/cmd/spark-connect-example-spark-session/main.go +++ b/cmd/spark-connect-example-spark-session/main.go @@ -102,4 +102,61 @@ func main() { log.Printf("DataFrame from sql: select count, word from view1 order by count") df.Show(100, false) + + log.Printf("Repartition with one partition") + df, err = df.Repartition(1, nil) + if err != nil { + log.Fatalf("Failed: %s", err.Error()) + } + + err = df.Write().Mode("overwrite"). + Format("parquet"). + Save("file:///tmp/spark-connect-write-example-output-one-partition.parquet") + if err != nil { + log.Fatalf("Failed: %s", err.Error()) + } + + log.Printf("Repartition with two partitions") + df, err = df.Repartition(2, nil) + if err != nil { + log.Fatalf("Failed: %s", err.Error()) + } + + err = df.Write().Mode("overwrite"). + Format("parquet"). + Save("file:///tmp/spark-connect-write-example-output-two-partition.parquet") + if err != nil { + log.Fatalf("Failed: %s", err.Error()) + } + + log.Printf("Repartition with columns") + df, err = df.Repartition(0, []string{"word", "count"}) + if err != nil { + log.Fatalf("Failed: %s", err.Error()) + } + + err = df.Write().Mode("overwrite"). + Format("parquet"). + Save("file:///tmp/spark-connect-write-example-output-repartition-with-column.parquet") + if err != nil { + log.Fatalf("Failed: %s", err.Error()) + } + + log.Printf("Repartition by range with columns") + df, err = df.RepartitionByRange(0, []sql.RangePartitionColumn{ + { + Name: "word", + Descending: true, + }, + }) + if err != nil { + log.Fatalf("Failed: %s", err.Error()) + } + + err = df.Write().Mode("overwrite"). + Format("parquet"). + Save("file:///tmp/spark-connect-write-example-output-repartition-by-range-with-column.parquet") + if err != nil { + log.Fatalf("Failed: %s", err.Error()) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org