This is an automated email from the ASF dual-hosted git repository.

kamilwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 343fd2a  [BEAM-11075] Customization of key size and value size (#13149)
343fd2a is described below

commit 343fd2a7e8d8facb38f0dbd330b1f43db2fceb72
Author: Kamil Wasilewski <[email protected]>
AuthorDate: Wed Oct 21 11:09:45 2020 +0200

    [BEAM-11075] Customization of key size and value size (#13149)
---
 sdks/go/pkg/beam/io/synthetic/source.go      | 60 +++++++++++++++++++++++++---
 sdks/go/pkg/beam/io/synthetic/source_test.go | 55 +++++++++++++++++++++++++
 2 files changed, 110 insertions(+), 5 deletions(-)

diff --git a/sdks/go/pkg/beam/io/synthetic/source.go 
b/sdks/go/pkg/beam/io/synthetic/source.go
index 4e50cce..f023de0 100644
--- a/sdks/go/pkg/beam/io/synthetic/source.go
+++ b/sdks/go/pkg/beam/io/synthetic/source.go
@@ -23,6 +23,8 @@
 package synthetic
 
 import (
+       "bytes"
+       "encoding/json"
        "fmt"
        "github.com/apache/beam/sdks/go/pkg/beam/core/sdf"
        "math/rand"
@@ -125,10 +127,10 @@ func (fn *sourceFn) Setup() {
 // ProcessElement creates a number of random elements based on the restriction
 // tracker received. Each element is a random byte slice key and value, in the
 // form of KV<[]byte, []byte>.
-func (fn *sourceFn) ProcessElement(rt *sdf.LockRTracker, _ SourceConfig, emit 
func([]byte, []byte)) error {
+func (fn *sourceFn) ProcessElement(rt *sdf.LockRTracker, config SourceConfig, 
emit func([]byte, []byte)) error {
        for i := rt.GetRestriction().(offsetrange.Restriction).Start; 
rt.TryClaim(i) == true; i++ {
-               key := make([]byte, 8)
-               val := make([]byte, 8)
+               key := make([]byte, config.KeySize)
+               val := make([]byte, config.ValueSize)
                if _, err := fn.rng.Read(key); err != nil {
                        return err
                }
@@ -165,6 +167,8 @@ func DefaultSourceConfig() *SourceConfigBuilder {
                cfg: SourceConfig{
                        NumElements:   1, // 0 is invalid (drops elements).
                        InitialSplits: 1, // 0 is invalid (drops elements).
+                       KeySize:       8, // 0 is invalid (drops elements).
+                       ValueSize:     8, // 0 is invalid (drops elements).
                },
        }
 }
@@ -197,6 +201,24 @@ func (b *SourceConfigBuilder) InitialSplits(val int) 
*SourceConfigBuilder {
        return b
 }
 
+// KeySize determines the size of the key of elements for the source to
+// generate.
+//
+// Valid values are in the range of [1, ...] and the default value is 8.
+func (b *SourceConfigBuilder) KeySize(val int) *SourceConfigBuilder {
+       b.cfg.KeySize = val
+       return b
+}
+
+// ValueSize determines the size of the value of elements for the source to
+// generate.
+//
+// Valid values are in the range of [1, ...] and the default value is 8.
+func (b *SourceConfigBuilder) ValueSize(val int) *SourceConfigBuilder {
+       b.cfg.ValueSize = val
+       return b
+}
+
 // Build constructs the SourceConfig initialized by this builder. It also
 // performs error checking on the fields, and panics if any have been set to
 // invalid values.
@@ -207,6 +229,32 @@ func (b *SourceConfigBuilder) Build() SourceConfig {
        if b.cfg.NumElements <= 0 {
                panic(fmt.Sprintf("SourceConfig.NumElements must be >= 1. Got: 
%v", b.cfg.NumElements))
        }
+       if b.cfg.KeySize <= 0 {
+               panic(fmt.Sprintf("SourceConfig.KeySize must be >= 1. Got: %v", 
b.cfg.KeySize))
+       }
+       if b.cfg.ValueSize <= 0 {
+               panic(fmt.Sprintf("SourceConfig.ValueSize must be >= 1. Got: 
%v", b.cfg.ValueSize))
+       }
+       return b.cfg
+}
+
+// BuildFromJSON constructs the SourceConfig by populating it with the parsed
+// JSON. Panics if there is an error in the syntax of the JSON or if the input
+// contains unknown object keys.
+//
+// An example of valid JSON object:
+// {
+//      "num_records": 5,
+//      "key_size": 5,
+//      "value_size": 5
+// }
+func (b *SourceConfigBuilder) BuildFromJSON(jsonData []byte) SourceConfig {
+       decoder := json.NewDecoder(bytes.NewReader(jsonData))
+       decoder.DisallowUnknownFields()
+
+       if err := decoder.Decode(&b.cfg); err != nil {
+               panic(fmt.Sprintf("Could not unmarshal SourceConfig: %v", err))
+       }
        return b.cfg
 }
 
@@ -214,6 +262,8 @@ func (b *SourceConfigBuilder) Build() SourceConfig {
 // synthetic source. It should be created via a SourceConfigBuilder, not by
 // directly initializing it (the fields are public to allow encoding).
 type SourceConfig struct {
-       NumElements   int
-       InitialSplits int
+       NumElements   int `json:"num_records"`
+       InitialSplits int `json:"initial_splits"`
+       KeySize       int `json:"key_size"`
+       ValueSize     int `json:"value_size"`
 }
diff --git a/sdks/go/pkg/beam/io/synthetic/source_test.go 
b/sdks/go/pkg/beam/io/synthetic/source_test.go
index 0bd5bd0..4ca3393 100644
--- a/sdks/go/pkg/beam/io/synthetic/source_test.go
+++ b/sdks/go/pkg/beam/io/synthetic/source_test.go
@@ -48,6 +48,38 @@ func TestSourceConfig_NumElements(t *testing.T) {
        }
 }
 
+// TestSourceConfig_KeyValueSize tests that setting the size of the key and the
+// value works correctly.
+func TestSourceConfig_KeyValueSize(t *testing.T) {
+       tests := []struct {
+               size int
+               want int
+       }{
+               {size: 1, want: 1},
+               {size: 42, want: 42},
+       }
+       for _, test := range tests {
+               test := test
+               t.Run(fmt.Sprintf("(size = %v)", test.size), func(t *testing.T) 
{
+                       dfn := sourceFn{}
+                       cfg := 
DefaultSourceConfig().KeySize(test.size).ValueSize(test.size).Build()
+
+                       keys, values, err := simulateSourceFn(t, &dfn, cfg)
+                       if err != nil {
+                               t.Errorf("Failure processing sourceFn: %v", err)
+                       }
+                       if got := len(keys[0]); got != test.want {
+                               t.Errorf("SourceFn emitted keys of wrong size: 
got: %v, want: %v",
+                                       got, test.want)
+                       }
+                       if got := len(values[0]); got != test.want {
+                               t.Errorf("SourceFn emitted values of wrong 
size: got: %v, want: %v",
+                                       got, test.want)
+                       }
+               })
+       }
+}
+
 // TestSourceConfig_InitialSplits tests that the InitialSplits config option
 // works correctly.
 func TestSourceConfig_InitialSplits(t *testing.T) {
@@ -107,6 +139,29 @@ func TestSourceConfig_InitialSplits(t *testing.T) {
        })
 }
 
+// TestSourceConfig_BuildFromJSON tests correctness of building the
+// SourceConfig from JSON data.
+func TestSourceConfig_BuildFromJSON(t *testing.T) {
+       tests := []struct {
+               jsonData string
+               want     SourceConfig
+       }{
+               {
+                       jsonData: "{\"num_records\": 5, \"key_size\": 2, 
\"value_size\": 3}",
+                       want:     
DefaultSourceConfig().NumElements(5).KeySize(2).ValueSize(3).Build(),
+               },
+       }
+       for _, test := range tests {
+               test := test
+               t.Run(fmt.Sprintf("(jsonData = %v)", test.jsonData), func(t 
*testing.T) {
+                       got := 
DefaultSourceConfig().BuildFromJSON([]byte(test.jsonData))
+                       if got != test.want {
+                               t.Errorf("Invalid SourceConfig: got: %#v, want: 
%#v", got, test.want)
+                       }
+               })
+       }
+}
+
 // simulateSourceFn calls CreateInitialRestriction, SplitRestriction,
 // CreateTracker, and ProcessElement on the given sourceFn with the given
 // SourceConfig, and outputs the resulting output elements. This method isn't

Reply via email to