[ 
https://issues.apache.org/jira/browse/BEAM-8292?focusedWorklogId=408535&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-408535
 ]

ASF GitHub Bot logged work on BEAM-8292:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Mar/20 04:20
            Start Date: 24/Mar/20 04:20
    Worklog Time Spent: 10m 
      Work Description: youngoli commented on pull request #11197: [BEAM-8292] 
Portable Reshuffle for Go SDK
URL: https://github.com/apache/beam/pull/11197#discussion_r396887966
 
 

 ##########
 File path: sdks/go/pkg/beam/gbk.go
 ##########
 @@ -95,3 +95,52 @@ func TryCoGroupByKey(s Scope, cols ...PCollection) 
(PCollection, error) {
        ret.SetCoder(NewCoder(ret.Type()))
        return ret, nil
 }
+
+// Reshuffle copies a PCollection of the same kind and using the same element
+// coder, and maintains the same windowing information. Importantly, it allows
+// the result PCollection to be processed with a different sharding, in a
+// different stage than the input PCollection.
+//
+// For example, if a computation needs a lot of parallelism but
+// produces only a small amount of output data, then the computation
+// producing the data can run with as much parallelism as needed,
+// while the output file is written with a smaller amount of
+// parallelism, using the following pattern:
+//
+//   pc := bigHairyComputationNeedingParallelism(scope) // PCollection<string>
+//   resharded := beam.Reshard(scope, pc)                // PCollection<string>
+//
+// Another use case is when one has a non-deterministic DoFn followed by one
+// that performs externally-visible side effects. Inserting a Reshard
+// between these DoFns ensures that retries of the second DoFn will always be
+// the same, which is necessary to make side effects idempotent.
+//
+// A Reshuffle will force a break in the optimized pipeline. Consequently,
+// this operation should be used sparingly, only after determining that the
+// pipeline without reshard is broken in some way and performing an extra
+// operation is worth the cost.
+func Reshuffle(s Scope, col PCollection) PCollection {
+       return Must(TryReshuffle(s, col))
+}
+
+// TryReshuffle inserts a Reshard into the pipeline, and returns an error if
 
 Review comment:
   Same as previous comment, using Reshard instead of Reshuffle. The error 
message a few lines below also does that.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 408535)

> Add a Reshuffle PTransform preventing fusion of the surrounding transforms
> --------------------------------------------------------------------------
>
>                 Key: BEAM-8292
>                 URL: https://issues.apache.org/jira/browse/BEAM-8292
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-go
>            Reporter: John Patoch
>            Assignee: Robert Burke
>            Priority: Minor
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> Reshuffle is a PTransform that takes a PCollection<A> and shuffles the data 
> to help increase parallelism.
> Reshuffle adds a temporary random key to each element, performs a
>  GroupByKey, and finally removes the temporary key.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to