[ https://issues.apache.org/jira/browse/BEAM-8292?focusedWorklogId=408535&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-408535 ]
ASF GitHub Bot logged work on BEAM-8292: ---------------------------------------- Author: ASF GitHub Bot Created on: 24/Mar/20 04:20 Start Date: 24/Mar/20 04:20 Worklog Time Spent: 10m Work Description: youngoli commented on pull request #11197: [BEAM-8292] Portable Reshuffle for Go SDK URL: https://github.com/apache/beam/pull/11197#discussion_r396887966 ########## File path: sdks/go/pkg/beam/gbk.go ########## @@ -95,3 +95,52 @@ func TryCoGroupByKey(s Scope, cols ...PCollection) (PCollection, error) { ret.SetCoder(NewCoder(ret.Type())) return ret, nil } + +// Reshuffle copies a PCollection of the same kind and using the same element +// coder, and maintains the same windowing information. Importantly, it allows +// the result PCollection to be processed with a different sharding, in a +// different stage than the input PCollection. +// +// For example, if a computation needs a lot of parallelism but +// produces only a small amount of output data, then the computation +// producing the data can run with as much parallelism as needed, +// while the output file is written with a smaller amount of +// parallelism, using the following pattern: +// +// pc := bigHairyComputationNeedingParallelism(scope) // PCollection<string> +// resharded := beam.Reshard(scope, pc) // PCollection<string> +// +// Another use case is when one has a non-deterministic DoFn followed by one +// that performs externally-visible side effects. Inserting a Reshard +// between these DoFns ensures that retries of the second DoFn will always be +// the same, which is necessary to make side effects idempotent. +// +// A Reshuffle will force a break in the optimized pipeline. Consequently, +// this operation should be used sparingly, only after determining that the +// pipeline without reshard is broken in some way and performing an extra +// operation is worth the cost. +func Reshuffle(s Scope, col PCollection) PCollection { + return Must(TryReshuffle(s, col)) +} + +// TryReshuffle inserts a Reshard into the pipeline, and returns an error if Review comment: Same as previous comment, using Reshard instead of Reshuffle. The error message a few lines below also does that. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 408535) > Add a Reshuffle PTransform preventing fusion of the surrounding transforms > -------------------------------------------------------------------------- > > Key: BEAM-8292 > URL: https://issues.apache.org/jira/browse/BEAM-8292 > Project: Beam > Issue Type: New Feature > Components: sdk-go > Reporter: John Patoch > Assignee: Robert Burke > Priority: Minor > Time Spent: 2h > Remaining Estimate: 0h > > Reshuffle is a PTransform that takes a PCollection<A> and shuffles the data > to help increase parallelism. > Reshuffle adds a temporary random key to each element, performs a > GroupByKey, and finally removes the temporary key. -- This message was sent by Atlassian Jira (v8.3.4#803005)