youngoli commented on a change in pull request #11747:
URL: https://github.com/apache/beam/pull/11747#discussion_r427004634



##########
File path: sdks/go/pkg/beam/io/synthetic/step.go
##########
@@ -144,49 +143,130 @@ func (fn *sdfStepFn) Setup() {
 // ProcessElement takes an input and either filters it or produces a number of
 // outputs identical to that input based on the restriction size.
 func (fn *sdfStepFn) ProcessElement(rt *offsetrange.Tracker, key, val []byte, 
emit func([]byte, []byte)) {
-       if fn.cfg.FilterRatio > 0 && fn.rng.Float64() < fn.cfg.FilterRatio {
-               return
-       }
+       filtered := fn.cfg.filterRatio > 0 && fn.rng.Float64() < 
fn.cfg.filterRatio
+
        for i := rt.Rest.Start; rt.TryClaim(i) == true; i++ {
-               emit(key, val)
+               if !filtered {
+                       emit(key, val)
+               }
+       }
+}
+
+// StepConfigBuilder is used to initialize StepConfigs. See StepConfigBuilder's
+// methods for descriptions of the fields in a StepConfig and how they can be
+// set. The intended approach for using this builder is to begin by calling the
+// DefaultStepConfig function, followed by calling setters, followed by calling
+// Build.
+//
+// Usage example:
+//
+//    cfg := 
synthetic.DefaultStepConfig().OutputPerInput(10).FilterRatio(0.5).Build()
+type StepConfigBuilder struct {
+       cfg StepConfig
+}
+
+// DefaultSourceConfig creates a StepConfig with intended defaults for the
+// StepConfig fields. This function is the intended starting point for
+// initializing a StepConfig and should always be used to create
+// StepConfigBuilders.
+//
+// To see descriptions of the various StepConfig fields and their defaults, see
+// the methods to StepConfigBuilder.
+func DefaultStepConfig() *StepConfigBuilder {
+       return &StepConfigBuilder{
+               cfg: StepConfig{
+                       outputPerInput: 1,     // Defaults shouldn't drop 
elements, so at least 1.
+                       filterRatio:    0.0,   // Defaults shouldn't drop 
elements, so don't filter.
+                       splittable:     false, // Default to non-splittable, 
SDFs are situational.
+                       initialSplits:  1,     // Defaults to 1, i.e. no 
initial splitting.
+               },
        }
 }
 
-// DefaultSourceConfig creates a SourceConfig with intended defaults for its
-// fields. SourceConfigs should be initialized with this method.
-func DefaultStepConfig() StepConfig {
-       return StepConfig{
-               OutputPerInput: 1,     // Defaults shouldn't drop elements, so 
at least 1.
-               FilterRatio:    0.0,   // Defaults shouldn't drop elements, so 
don't filter.
-               Splittable:     false, // Default to non-splittable, SDFs are 
situational.
-               InitialSplits:  1,     // Defaults to 1, i.e. no initial 
splitting.
+// OutputPerInput is the number of outputs to emit per input received. Each
+// output is identical to the original input. A value of 0 drops all inputs and
+// produces no output.
+//
+// Valid values are in the range of [0, ...] and the default value is 1. Values
+// below 0 are invalid as they have no logical meaning for this field.
+func (b *StepConfigBuilder) OutputPerInput(val int) *StepConfigBuilder {
+       b.cfg.outputPerInput = val
+       return b
+}
+
+// FilterRatio indicates the random chance that an input will be filtered
+// out, meaning that no outputs will get emitted for it. For example, a
+// FilterRatio of 0.25 means that 25% of inputs will be filtered out, a
+// FilterRatio of 0 means no elements are filtered, and a FilterRatio of 1.0
+// means every element is filtered.
+//
+// In a non-splittable step, this is performed on each input element, meaning
+// all outputs for that element would be filtered. In a splittable step, this 
is
+// performed on each input restriction instead of the entire element, meaning
+// that some outputs for an element may be filtered and others kept.
+//
+// Note that even when elements are filtered out, the work associated with
+// processing those elements is still performed, which differs from setting an
+// OutputPerInput of 0. Also note that if a
+//
+// Valid values are in the range if [0.0, 1.0], and the default value is 0. In
+// order to avoid precision errors, invalid values do not cause errors. 
Instead,
+// values below 0 are functionally equivalent to 0, and values above 1 are
+// functionally equivalent to 1.
+func (b *StepConfigBuilder) FilterRatio(val float64) *StepConfigBuilder {
+       b.cfg.filterRatio = val
+       return b
+}
+
+// Splittable indicates whether the step should use the splittable DoFn or
+// non-splittable DoFn implementation.
+//
+// Splittable steps will split along restrictions representing the number of
+// OutputPerInput for each element, so it is most useful for steps with a high
+// OutputPerInput. Conversely, if OutputPerInput is 1, then there is no way to
+// split restrictions further, so making the step splittable will do nothing.
+func (b *StepConfigBuilder) Splittable(val bool) *StepConfigBuilder {
+       b.cfg.splittable = val
+       return b
+}
+
+// InitialSplits is only applicable if Splittable is set to true, and 
determines
+// the number of initial splits to perform in the step's SplitRestriction
+// method. Restrictions in synthetic steps represent the number of elements to
+// emit for each input element, as defined by the OutputPerInput config field,
+// and this split is performed evenly across that number of elements.
+//
+// Each resulting restriction will have at least 1 element in it, and each
+// element being emitted will be contained in exactly one restriction. That
+// means that if the desired number of splits is greater than the 
OutputPerInput
+// N, then N initial restrictions will be created, each containing 1 element.
+//
+// Valid values are in the range of [1, ...] and the default value is 1. Values
+// of 0 (and below) are invalid as they would result in dropping elements that
+// are expected to be emitted.
+func (b *StepConfigBuilder) InitialSplits(val int) *StepConfigBuilder {
+       b.cfg.initialSplits = val
+       return b
+}
+
+// Build constructs the StepConfig initialized by this builder. It also 
performs
+// error checking on the fields, and panics if any have been set to invalid
+// values.
+func (b *StepConfigBuilder) Build() StepConfig {
+       if b.cfg.initialSplits <= 0 {
+               panic(fmt.Sprintf("StepConfig.InitialSplits must be >= 1. Got: 
%v", b.cfg.initialSplits))
+       }
+       if b.cfg.outputPerInput < 0 {
+               panic(fmt.Sprintf("StepConfig.OutputPerInput cannot be 
negative. Got: %v", b.cfg.outputPerInput))
        }
+       return b.cfg
 }
 
 // StepConfig is a struct containing all the configuration options for a
-// synthetic step.
+// synthetic step. It should be created via a StepConfigBuilder.
 type StepConfig struct {
-       // OutputPerInput is the number of outputs to emit per input received. 
Each
-       // output is identical to the original input. A value of 0 drops each 
input.
-       OutputPerInput int
-
-       // FilterRatio indicates the random chance that an input will be 
filtered
-       // out, meaning that no outputs will get emitted for it. For example, a
-       // FilterRatio of 0.25 means that 25% of inputs will get filtered out.
-       FilterRatio float64
-
-       // Splittable indicates whether the step should use the splittable DoFn 
or
-       // non-splittable DoFn implementation. Splittable steps will split the
-       // number of OutputPerInput into restrictions, so it is most useful for
-       // steps with a high OutputPerInput.
-       Splittable bool
-
-       // InitialSplits is only applicable if Splittable is set to true, and
-       // determines the number of initial splits to perform in the step's
-       // SplitRestriction method. Note that in some edge cases, the number of
-       // splits performed might differ from this config value. Each 
restriction
-       // will always have one element in it, and at least one restriction will
-       // always be output, so the number of splits will be in the range of 
[1, N]
-       // where N is the size of the original restriction.
-       InitialSplits int
+       outputPerInput int

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to