This is an automated email from the ASF dual-hosted git repository.
kamilwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 343fd2a [BEAM-11075] Customization of key size and value size (#13149)
343fd2a is described below
commit 343fd2a7e8d8facb38f0dbd330b1f43db2fceb72
Author: Kamil Wasilewski <[email protected]>
AuthorDate: Wed Oct 21 11:09:45 2020 +0200
[BEAM-11075] Customization of key size and value size (#13149)
---
sdks/go/pkg/beam/io/synthetic/source.go | 60 +++++++++++++++++++++++++---
sdks/go/pkg/beam/io/synthetic/source_test.go | 55 +++++++++++++++++++++++++
2 files changed, 110 insertions(+), 5 deletions(-)
diff --git a/sdks/go/pkg/beam/io/synthetic/source.go
b/sdks/go/pkg/beam/io/synthetic/source.go
index 4e50cce..f023de0 100644
--- a/sdks/go/pkg/beam/io/synthetic/source.go
+++ b/sdks/go/pkg/beam/io/synthetic/source.go
@@ -23,6 +23,8 @@
package synthetic
import (
+ "bytes"
+ "encoding/json"
"fmt"
"github.com/apache/beam/sdks/go/pkg/beam/core/sdf"
"math/rand"
@@ -125,10 +127,10 @@ func (fn *sourceFn) Setup() {
// ProcessElement creates a number of random elements based on the restriction
// tracker received. Each element is a random byte slice key and value, in the
// form of KV<[]byte, []byte>.
-func (fn *sourceFn) ProcessElement(rt *sdf.LockRTracker, _ SourceConfig, emit
func([]byte, []byte)) error {
+func (fn *sourceFn) ProcessElement(rt *sdf.LockRTracker, config SourceConfig,
emit func([]byte, []byte)) error {
for i := rt.GetRestriction().(offsetrange.Restriction).Start;
rt.TryClaim(i) == true; i++ {
- key := make([]byte, 8)
- val := make([]byte, 8)
+ key := make([]byte, config.KeySize)
+ val := make([]byte, config.ValueSize)
if _, err := fn.rng.Read(key); err != nil {
return err
}
@@ -165,6 +167,8 @@ func DefaultSourceConfig() *SourceConfigBuilder {
cfg: SourceConfig{
NumElements: 1, // 0 is invalid (drops elements).
InitialSplits: 1, // 0 is invalid (drops elements).
+ KeySize: 8, // 0 is invalid (drops elements).
+ ValueSize: 8, // 0 is invalid (drops elements).
},
}
}
@@ -197,6 +201,24 @@ func (b *SourceConfigBuilder) InitialSplits(val int)
*SourceConfigBuilder {
return b
}
+// KeySize determines the size of the key of elements for the source to
+// generate.
+//
+// Valid values are in the range of [1, ...] and the default value is 8.
+func (b *SourceConfigBuilder) KeySize(val int) *SourceConfigBuilder {
+ b.cfg.KeySize = val
+ return b
+}
+
+// ValueSize determines the size of the value of elements for the source to
+// generate.
+//
+// Valid values are in the range of [1, ...] and the default value is 8.
+func (b *SourceConfigBuilder) ValueSize(val int) *SourceConfigBuilder {
+ b.cfg.ValueSize = val
+ return b
+}
+
// Build constructs the SourceConfig initialized by this builder. It also
// performs error checking on the fields, and panics if any have been set to
// invalid values.
@@ -207,6 +229,32 @@ func (b *SourceConfigBuilder) Build() SourceConfig {
if b.cfg.NumElements <= 0 {
panic(fmt.Sprintf("SourceConfig.NumElements must be >= 1. Got:
%v", b.cfg.NumElements))
}
+ if b.cfg.KeySize <= 0 {
+ panic(fmt.Sprintf("SourceConfig.KeySize must be >= 1. Got: %v",
b.cfg.KeySize))
+ }
+ if b.cfg.ValueSize <= 0 {
+ panic(fmt.Sprintf("SourceConfig.ValueSize must be >= 1. Got:
%v", b.cfg.ValueSize))
+ }
+ return b.cfg
+}
+
+// BuildFromJSON constructs the SourceConfig by populating it with the parsed
+// JSON. Panics if there is an error in the syntax of the JSON or if the input
+// contains unknown object keys.
+//
+// An example of valid JSON object:
+// {
+// "num_records": 5,
+// "key_size": 5,
+// "value_size": 5
+// }
+func (b *SourceConfigBuilder) BuildFromJSON(jsonData []byte) SourceConfig {
+ decoder := json.NewDecoder(bytes.NewReader(jsonData))
+ decoder.DisallowUnknownFields()
+
+ if err := decoder.Decode(&b.cfg); err != nil {
+ panic(fmt.Sprintf("Could not unmarshal SourceConfig: %v", err))
+ }
return b.cfg
}
@@ -214,6 +262,8 @@ func (b *SourceConfigBuilder) Build() SourceConfig {
// synthetic source. It should be created via a SourceConfigBuilder, not by
// directly initializing it (the fields are public to allow encoding).
type SourceConfig struct {
- NumElements int
- InitialSplits int
+ NumElements int `json:"num_records"`
+ InitialSplits int `json:"initial_splits"`
+ KeySize int `json:"key_size"`
+ ValueSize int `json:"value_size"`
}
diff --git a/sdks/go/pkg/beam/io/synthetic/source_test.go
b/sdks/go/pkg/beam/io/synthetic/source_test.go
index 0bd5bd0..4ca3393 100644
--- a/sdks/go/pkg/beam/io/synthetic/source_test.go
+++ b/sdks/go/pkg/beam/io/synthetic/source_test.go
@@ -48,6 +48,38 @@ func TestSourceConfig_NumElements(t *testing.T) {
}
}
+// TestSourceConfig_KeyValueSize tests that setting the size of the key and the
+// value works correctly.
+func TestSourceConfig_KeyValueSize(t *testing.T) {
+ tests := []struct {
+ size int
+ want int
+ }{
+ {size: 1, want: 1},
+ {size: 42, want: 42},
+ }
+ for _, test := range tests {
+ test := test
+ t.Run(fmt.Sprintf("(size = %v)", test.size), func(t *testing.T)
{
+ dfn := sourceFn{}
+ cfg :=
DefaultSourceConfig().KeySize(test.size).ValueSize(test.size).Build()
+
+ keys, values, err := simulateSourceFn(t, &dfn, cfg)
+ if err != nil {
+ t.Errorf("Failure processing sourceFn: %v", err)
+ }
+ if got := len(keys[0]); got != test.want {
+ t.Errorf("SourceFn emitted keys of wrong size:
got: %v, want: %v",
+ got, test.want)
+ }
+ if got := len(values[0]); got != test.want {
+ t.Errorf("SourceFn emitted values of wrong
size: got: %v, want: %v",
+ got, test.want)
+ }
+ })
+ }
+}
+
// TestSourceConfig_InitialSplits tests that the InitialSplits config option
// works correctly.
func TestSourceConfig_InitialSplits(t *testing.T) {
@@ -107,6 +139,29 @@ func TestSourceConfig_InitialSplits(t *testing.T) {
})
}
+// TestSourceConfig_BuildFromJSON tests correctness of building the
+// SourceConfig from JSON data.
+func TestSourceConfig_BuildFromJSON(t *testing.T) {
+ tests := []struct {
+ jsonData string
+ want SourceConfig
+ }{
+ {
+ jsonData: "{\"num_records\": 5, \"key_size\": 2,
\"value_size\": 3}",
+ want:
DefaultSourceConfig().NumElements(5).KeySize(2).ValueSize(3).Build(),
+ },
+ }
+ for _, test := range tests {
+ test := test
+ t.Run(fmt.Sprintf("(jsonData = %v)", test.jsonData), func(t
*testing.T) {
+ got :=
DefaultSourceConfig().BuildFromJSON([]byte(test.jsonData))
+ if got != test.want {
+ t.Errorf("Invalid SourceConfig: got: %#v, want:
%#v", got, test.want)
+ }
+ })
+ }
+}
+
// simulateSourceFn calls CreateInitialRestriction, SplitRestriction,
// CreateTracker, and ProcessElement on the given sourceFn with the given
// SourceConfig, and outputs the resulting output elements. This method isn't