This is an automated email from the ASF dual-hosted git repository. danoliveira pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new ce190e1 [BEAM-11108] Add a version of TextIO implemented via SDF. (#13188) ce190e1 is described below commit ce190e11332469ea59b6c9acf16ee7c673ccefdd Author: Daniel Oliveira <daniel.o.program...@gmail.com> AuthorDate: Wed Oct 28 23:28:08 2020 -0700 [BEAM-11108] Add a version of TextIO implemented via SDF. (#13188) Pretty straightforward. Read and ReadSdf should be functionally identical, and likewise for ReadAll and ReadAllSdf. Also adds a new OffsetRange method with unit tests. --- sdks/go/examples/stringsplit/stringsplit.go | 2 +- sdks/go/examples/wordcount/wordcount.go | 8 + sdks/go/pkg/beam/core/runtime/graphx/translate.go | 2 +- .../beam/io/rtrackers/offsetrange/offsetrange.go | 29 ++- .../io/rtrackers/offsetrange/offsetrange_test.go | 97 +++++++++- sdks/go/pkg/beam/io/textio/sdf.go | 203 +++++++++++++++++++++ sdks/go/pkg/beam/io/textio/sdf_test.go | 39 ++++ sdks/go/pkg/beam/io/textio/textio.go | 2 +- 8 files changed, 372 insertions(+), 10 deletions(-) diff --git a/sdks/go/examples/stringsplit/stringsplit.go b/sdks/go/examples/stringsplit/stringsplit.go index e7ea230..13a1905 100644 --- a/sdks/go/examples/stringsplit/stringsplit.go +++ b/sdks/go/examples/stringsplit/stringsplit.go @@ -39,11 +39,11 @@ package main import ( "context" "flag" - "github.com/apache/beam/sdks/go/pkg/beam/core/sdf" "reflect" "time" "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/go/pkg/beam/io/rtrackers/offsetrange" "github.com/apache/beam/sdks/go/pkg/beam/log" "github.com/apache/beam/sdks/go/pkg/beam/x/beamx" diff --git a/sdks/go/examples/wordcount/wordcount.go b/sdks/go/examples/wordcount/wordcount.go index fabaa60..d41ad01 100644 --- a/sdks/go/examples/wordcount/wordcount.go +++ b/sdks/go/examples/wordcount/wordcount.go @@ -101,6 +101,14 @@ var ( // output any number of elements. It operates on a PCollection of type string and // returns a PCollection of type string. Also, using named function transforms allows // for easy reuse, modular testing, and an improved monitoring experience. +// +// DoFns must be registered with Beam in order to be executed in ParDos. This is +// done automatically by the starcgen code generator, or it can be done manually +// by calling beam.RegisterFunction in an init() call. +func init() { + beam.RegisterFunction(extractFn) + beam.RegisterFunction(formatFn) +} var ( wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`) diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go index a52dd43..8603185 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go @@ -123,7 +123,7 @@ func Marshal(edges []*graph.MultiEdge, opt *Options) (*pipepb.Pipeline, error) { if len(edges) == 0 { return nil, errors.New("empty graph") } - + tree := NewScopeTree(edges) m := newMarshaller(opt) diff --git a/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go b/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go index e36692a..3c76725 100644 --- a/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go +++ b/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange.go @@ -61,9 +61,10 @@ type Restriction struct { Start, End int64 } -// EvenSplits splits a restriction into a number of evenly sized restrictions. -// Each split restriction is guaranteed to not be empty, and each unit from the -// original restriction is guaranteed to be contained in one split restriction. +// EvenSplits splits a restriction into a number of evenly sized restrictions +// in ascending order. Each split restriction is guaranteed to not be empty, and +// each unit from the original restriction is guaranteed to be contained in one +// split restriction. // // Num should be greater than 0. Otherwise there is no way to split the // restriction and this function will return the original restriction. @@ -89,6 +90,28 @@ func (r Restriction) EvenSplits(num int64) (splits []Restriction) { return splits } +// SizedSplits splits a restriction into multiple restrictions of the given +// size, in ascending order. If the restriction cannot be evenly split, the +// final restriction will be the remainder. +// +// Example: (0, 24) split into size 10s -> {(0, 10), (10, 20), (20, 24)} +// +// Size should be greater than 0. Otherwise there is no way to split the +// restriction and this function will return the original restriction. +func (r Restriction) SizedSplits(size int64) (splits []Restriction) { + if size < 1 { + // Don't split, just return original restriction. + return append(splits, r) + } + + s := r.Start + for e := s + size; e < r.End; s, e = e, e+size { + splits = append(splits, Restriction{Start: s, End: e}) + } + splits = append(splits, Restriction{Start: s, End: r.End}) + return splits +} + // Size returns the restriction's size as the difference between Start and End. func (r Restriction) Size() float64 { return float64(r.End - r.Start) diff --git a/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go b/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go index 489e251..ba7da64 100644 --- a/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go +++ b/sdks/go/pkg/beam/io/rtrackers/offsetrange/offsetrange_test.go @@ -63,10 +63,99 @@ func TestRestriction_EvenSplits(t *testing.T) { t.Errorf("split restriction [%v, %v] has unexpected size. got: %v, want: %v or %v", split.Start, split.End, size, min, min+1) } - // Check: All elements are still in a split restrictions. This - // logic assumes that the splits are returned in order which - // isn't guaranteed by EvenSplits, but this check is way easier - // with the assumption. + // Check: All elements are still in a split restriction and + // the restrictions are in the appropriate ascending order. + if split.Start != prevEnd { + t.Errorf("restriction range [%v, %v] missing after splits.", + prevEnd, split.Start) + } else { + prevEnd = split.End + } + } + if prevEnd != r.End { + t.Errorf("restriction range [%v, %v] missing after splits.", + prevEnd, r.End) + } + }) + } +} + +// TestRestriction_SizedSplits tests various splits and checks that they all +// follow the contract for SizedSplits. This means that all restrictions match +// the given size unless it is a remainder, and that each element is present +// in the split restrictions. +func TestRestriction_SizedSplits(t *testing.T) { + tests := []struct { + name string + rest Restriction + size int64 + want []Restriction + }{ + { + name: "Remainder", + rest: Restriction{Start: 0, End: 11}, + size: 5, + want: []Restriction{{0, 5}, {5, 10}, {10, 11}}, + }, + { + name: "OffsetRemainder", + rest: Restriction{Start: 11, End: 22}, + size: 5, + want: []Restriction{{11, 16}, {16, 21}, {21, 22}}, + }, + { + name: "OffsetExact", + rest: Restriction{Start: 11, End: 21}, + size: 5, + want: []Restriction{{11, 16}, {16, 21}}, + }, + { + name: "LargeValues", + rest: Restriction{Start: 0, End: 1024 * 1024 * 1024}, + size: 400 * 1024 * 1024, + want: []Restriction{ + {0, 400 * 1024 * 1024}, + {400 * 1024 * 1024, 800 * 1024 * 1024}, + {800 * 1024 * 1024, 1024 * 1024 * 1024}, + }, + }, + { + name: "OverlyLargeSize", + rest: Restriction{Start: 0, End: 5}, + size: 10, + want: []Restriction{{0, 5}}, + }, + { + name: "InvalidSize", + rest: Restriction{Start: 0, End: 21}, + size: 0, + want: []Restriction{{0, 21}}, + }, + } + for _, test := range tests { + test := test + t.Run(fmt.Sprintf("%v (rest[%v, %v], size = %v)", + test.name, test.rest.Start, test.rest.End, test.size), func(t *testing.T) { + r := test.rest + + // Get the minimum size that a split restriction can be. Max size + // should be min + 1. This way we can check the size of each split. + splits := r.SizedSplits(test.size) + prevEnd := r.Start + for i, split := range splits { + size := split.End - split.Start + // Check: Each restriction has at least 1 element. + if size == 0 { + t.Errorf("split restriction [%v, %v] is empty, size must be greater than 0.", + split.Start, split.End) + } + // Check: Restrictions (except for the last one) must match the split size. + if i != len(splits)-1 && size != test.size { + t.Errorf("split restriction [%v, %v] has unexpected size. got: %v, want: %v", + split.Start, split.End, size, test.size) + } + // Check: All elements are still in a split restriction and + // the restrictions are in the appropriate ascending order. if split.Start != prevEnd { t.Errorf("restriction range [%v, %v] missing after splits.", prevEnd, split.Start) diff --git a/sdks/go/pkg/beam/io/textio/sdf.go b/sdks/go/pkg/beam/io/textio/sdf.go new file mode 100644 index 0000000..bfccca9 --- /dev/null +++ b/sdks/go/pkg/beam/io/textio/sdf.go @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package textio + +import ( + "bufio" + "context" + "io" + "reflect" + "strings" + + "github.com/apache/beam/sdks/go/pkg/beam" + "github.com/apache/beam/sdks/go/pkg/beam/core/sdf" + "github.com/apache/beam/sdks/go/pkg/beam/internal/errors" + "github.com/apache/beam/sdks/go/pkg/beam/io/filesystem" + "github.com/apache/beam/sdks/go/pkg/beam/io/rtrackers/offsetrange" + "github.com/apache/beam/sdks/go/pkg/beam/log" +) + +func init() { + beam.RegisterType(reflect.TypeOf((*readSdfFn)(nil)).Elem()) + beam.RegisterFunction(sizeFn) +} + +// ReadSdf is a variation of Read implemented via SplittableDoFn. This should +// result in increased performance with runners that support splitting. +func ReadSdf(s beam.Scope, glob string) beam.PCollection { + s = s.Scope("textio.ReadSdf") + + filesystem.ValidateScheme(glob) + return readSdf(s, beam.Create(s, glob)) +} + +// ReadAllSdf is a variation of ReadAll implemented via SplittableDoFn. This +// should result in increased performance with runners that support splitting. +func ReadAllSdf(s beam.Scope, col beam.PCollection) beam.PCollection { + s = s.Scope("textio.ReadAllSdf") + + return readSdf(s, col) +} + +// readSdf takes a PCollection of globs and returns a PCollection of lines from +// all files in those globs. Unlike textio.read, this version uses an SDF for +// reading files. +func readSdf(s beam.Scope, col beam.PCollection) beam.PCollection { + files := beam.ParDo(s, expandFn, col) + sized := beam.ParDo(s, sizeFn, files) + return beam.ParDo(s, &readSdfFn{}, sized) +} + +// sizeFn pairs a filename with the size of that file in bytes. +// TODO(BEAM-11109): Once CreateInitialRestriction supports Context params and +// error return values, this can be done in readSdfFn.CreateInitialRestriction. +func sizeFn(ctx context.Context, filename string) (string, int64, error) { + fs, err := filesystem.New(ctx, filename) + if err != nil { + return "", -1, err + } + defer fs.Close() + + size, err := fs.Size(ctx, filename) + if err != nil { + return "", -1, err + } + return filename, size, nil +} + +// readSdfFn reads individual lines from a text file, given a filename and a +// size in bytes for that file. +type readSdfFn struct { +} + +// CreateInitialRestriction creates an offset range restriction representing +// the file, using the paired size rather than fetching the file's size. +func (fn *readSdfFn) CreateInitialRestriction(_ string, size int64) offsetrange.Restriction { + return offsetrange.Restriction{ + Start: 0, + End: size, + } +} + +const ( + // blockSize is the desired size of each block for initial splits. + blockSize int64 = 64 * 1024 * 1024 // 64 MB + // tooSmall is the size limit for a block. If the last block is smaller than + // this, it gets merged with the previous block. + tooSmall = blockSize / 4 +) + +// SplitRestriction splits each file restriction into blocks of a predeterined +// size, with some checks to avoid having small remainders. +func (fn *readSdfFn) SplitRestriction(_ string, _ int64, rest offsetrange.Restriction) []offsetrange.Restriction { + splits := rest.SizedSplits(blockSize) + numSplits := len(splits) + if numSplits > 1 { + last := splits[numSplits-1] + if last.End-last.Start <= tooSmall { + // Last restriction is too small, so merge it with previous one. + splits[numSplits-2].End = last.End + splits = splits[:numSplits-1] + } + } + return splits +} + +// Size returns the size of each restriction as its range. +func (fn *readSdfFn) RestrictionSize(_ string, _ int64, rest offsetrange.Restriction) float64 { + return rest.Size() +} + +// CreateTracker creates sdf.LockRTrackers wrapping offsetRange.Trackers for +// each restriction. +func (fn *readSdfFn) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker { + return sdf.NewLockRTracker(offsetrange.NewTracker(rest)) +} + +// ProcessElement outputs all lines in the file that begin within the paired +// restriction. +// +// Note that restrictions may not align perfectly with lines. So lines can begin +// before the restriction and end within it (those are ignored), and lines can +// begin within the restriction and past the restriction (those are entirely +// output, including the portion outside the restriction). In some cases a +// valid restriction might not output any lines. +func (fn *readSdfFn) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, filename string, _ int64, emit func(string)) error { + log.Infof(ctx, "Reading from %v", filename) + + fs, err := filesystem.New(ctx, filename) + if err != nil { + return err + } + defer fs.Close() + + fd, err := fs.OpenRead(ctx, filename) + if err != nil { + return err + } + defer fd.Close() + + rd := bufio.NewReader(fd) + + i := rt.GetRestriction().(offsetrange.Restriction).Start + if i > 0 { + // If restriction's starts after 0, we cannot assume a new line starts + // at the beginning of the restriction, so we must search for the first + // line beginning at or after restriction.Start. This is done by + // scanning to the byte just before the restriction and then reading + // until the next newline, leaving the reader at the start of a new + // line past restriction.Start. + i -= 1 + n, err := rd.Discard(int(i)) // Scan to just before restriction. + if err == io.EOF { + return errors.Errorf("TextIO restriction lies outside the file being read. "+ + "Restriction begins at %v bytes, but file is only %v bytes.", i+1, n) + } + if err != nil { + return err + } + line, err := rd.ReadString('\n') // Read until the first line within the restriction. + if err == io.EOF { + // No lines start in the restriction but it's still valid, so + // finish claiming before returning to avoid errors. + rt.TryClaim(rt.GetRestriction().(offsetrange.Restriction).End) + return nil + } + if err != nil { + return err + } + i += int64(len(line)) + } + + // Claim each line until we claim a line outside the restriction. + for rt.TryClaim(i) { + line, err := rd.ReadString('\n') + if err == io.EOF { + if len(line) != 0 { + emit(strings.TrimSuffix(line, "\n")) + } + // Finish claiming restriction before breaking to avoid errors. + rt.TryClaim(rt.GetRestriction().(offsetrange.Restriction).End) + break + } + if err != nil { + return err + } + emit(strings.TrimSuffix(line, "\n")) + i += int64(len(line)) + } + return nil +} diff --git a/sdks/go/pkg/beam/io/textio/sdf_test.go b/sdks/go/pkg/beam/io/textio/sdf_test.go new file mode 100644 index 0000000..05a26fa --- /dev/null +++ b/sdks/go/pkg/beam/io/textio/sdf_test.go @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package textio + +import ( + "context" + "testing" + + "github.com/apache/beam/sdks/go/pkg/beam" + _ "github.com/apache/beam/sdks/go/pkg/beam/runners/direct" + "github.com/apache/beam/sdks/go/pkg/beam/testing/passert" +) + +// TestReadSdf tests that readSdf successfully reads a test text file, and +// outputs the correct number of lines for it, even for an exceedingly long +// line. +func TestReadSdf(t *testing.T) { + f := "../../../../data/textio_test.txt" + p, s := beam.NewPipelineWithRoot() + lines := ReadSdf(s, f) + passert.Count(s, lines, "NumLines", 1) + + if err := beam.Run(context.Background(), "direct", p); err != nil { + t.Fatalf("Failed to execute job: %v", err) + } +} diff --git a/sdks/go/pkg/beam/io/textio/textio.go b/sdks/go/pkg/beam/io/textio/textio.go index 67a006c..e8bc7ed 100644 --- a/sdks/go/pkg/beam/io/textio/textio.go +++ b/sdks/go/pkg/beam/io/textio/textio.go @@ -104,7 +104,7 @@ func readFn(ctx context.Context, filename string, emit func(string)) error { break } if err != nil { - return (err) + return err } emit(strings.TrimSuffix(line, "\n")) }