This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c614557c51 [Go SDK]: Add fileio MatchFiles, MatchAll and ReadMatches 
transforms (#25809)
9c614557c51 is described below

commit 9c614557c51ad55230211f864e70f48ad0914326
Author: Johanna Öjeling <51084516+johannaojel...@users.noreply.github.com>
AuthorDate: Wed Mar 29 22:44:29 2023 +0200

    [Go SDK]: Add fileio MatchFiles, MatchAll and ReadMatches transforms 
(#25809)
    
    * Create utility file structs
    
    * Create MatchFiles and MatchAll transforms
    
    * Create ReadMatches transform
    
    * Add example doc for transforms
    
    * Move FileMetadata and Compression to fileio
    
    * Provide functional options without exporting enums
    
    * Update CHANGES.md
    
    * Rename read compression options
    
    * Provide default options
---
 CHANGES.md                                 |   1 +
 sdks/go/pkg/beam/io/fileio/example_test.go |  74 ++++++++
 sdks/go/pkg/beam/io/fileio/file.go         | 142 ++++++++++++++
 sdks/go/pkg/beam/io/fileio/file_test.go    | 254 +++++++++++++++++++++++++
 sdks/go/pkg/beam/io/fileio/gzip.go         |  60 ++++++
 sdks/go/pkg/beam/io/fileio/helper_test.go  |  88 +++++++++
 sdks/go/pkg/beam/io/fileio/match.go        | 189 +++++++++++++++++++
 sdks/go/pkg/beam/io/fileio/match_test.go   | 285 +++++++++++++++++++++++++++++
 sdks/go/pkg/beam/io/fileio/read.go         | 140 ++++++++++++++
 sdks/go/pkg/beam/io/fileio/read_test.go    | 181 ++++++++++++++++++
 10 files changed, 1414 insertions(+)

diff --git a/CHANGES.md b/CHANGES.md
index ab8a0b797a7..0f54d233ad6 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -62,6 +62,7 @@
 * Support for X source added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
 * BigQuery Storage Write API is now available in Python SDK via cross-language 
([#21961](https://github.com/apache/beam/issues/21961)).
 * Added HbaseIO support for writing RowMutations (ordered by rowkey) to Hbase 
(Java) ([#25830](https://github.com/apache/beam/issues/25830)).
+* Added fileio transforms MatchFiles, MatchAll and ReadMatches (Go) 
([#25779](https://github.com/apache/beam/issues/25779)).
 
 ## New Features / Improvements
 
diff --git a/sdks/go/pkg/beam/io/fileio/example_test.go 
b/sdks/go/pkg/beam/io/fileio/example_test.go
new file mode 100644
index 00000000000..ed27546f9dd
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fileio/example_test.go
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fileio_test
+
+import (
+       "context"
+       "log"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/fileio"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
+)
+
+func ExampleMatchFiles() {
+       beam.Init()
+       p, s := beam.NewPipelineWithRoot()
+
+       matches := fileio.MatchFiles(s, "gs://path/to/*.gz")
+       debug.Print(s, matches)
+
+       if err := beamx.Run(context.Background(), p); err != nil {
+               log.Fatalf("Failed to execute job: %v", err)
+       }
+}
+
+func ExampleMatchAll() {
+       beam.Init()
+       p, s := beam.NewPipelineWithRoot()
+
+       globs := beam.Create(s, "gs://path/to/sub1/*.gz", 
"gs://path/to/sub2/*.gz")
+       matches := fileio.MatchAll(s, globs)
+       debug.Print(s, matches)
+
+       if err := beamx.Run(context.Background(), p); err != nil {
+               log.Fatalf("Failed to execute job: %v", err)
+       }
+}
+
+func ExampleReadMatches() {
+       beam.Init()
+       p, s := beam.NewPipelineWithRoot()
+
+       pairFn := func(ctx context.Context, file fileio.ReadableFile, emit 
func(string, string)) error {
+               contents, err := file.ReadString(ctx)
+               if err != nil {
+                       return err
+               }
+               emit(file.Metadata.Path, contents)
+               return nil
+       }
+
+       matches := fileio.MatchFiles(s, "gs://path/to/*.gz")
+       files := fileio.ReadMatches(s, matches)
+       pairs := beam.ParDo(s, pairFn, files)
+       debug.Print(s, pairs)
+
+       if err := beamx.Run(context.Background(), p); err != nil {
+               log.Fatalf("Failed to execute job: %v", err)
+       }
+}
diff --git a/sdks/go/pkg/beam/io/fileio/file.go 
b/sdks/go/pkg/beam/io/fileio/file.go
new file mode 100644
index 00000000000..4ae7b3d3d07
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fileio/file.go
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fileio
+
+import (
+       "context"
+       "errors"
+       "io"
+       "path/filepath"
+       "reflect"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+)
+
+func init() {
+       beam.RegisterType(reflect.TypeOf((*FileMetadata)(nil)).Elem())
+       beam.RegisterType(reflect.TypeOf((*ReadableFile)(nil)).Elem())
+}
+
+// FileMetadata contains metadata about a file, namely its path and size in 
bytes.
+type FileMetadata struct {
+       Path string
+       Size int64
+}
+
+// compressionType is the type of compression used to compress a file.
+type compressionType int
+
+const (
+       // compressionAuto indicates that the compression type should be 
auto-detected.
+       compressionAuto compressionType = iota
+       // compressionGzip indicates that the file is compressed using gzip.
+       compressionGzip
+       // compressionUncompressed indicates that the file is not compressed.
+       compressionUncompressed
+)
+
+// ReadableFile is a wrapper around a FileMetadata and compressionType that 
can be used to obtain a
+// file descriptor or read the file's contents.
+type ReadableFile struct {
+       Metadata    FileMetadata
+       Compression compressionType
+}
+
+// Open opens the file for reading. The compression type is determined by the 
Compression field of
+// the ReadableFile. If Compression is compressionAuto, the compression type 
is auto-detected from
+// the file extension. It is the caller's responsibility to close the returned 
reader.
+func (f ReadableFile) Open(ctx context.Context) (io.ReadCloser, error) {
+       fs, err := filesystem.New(ctx, f.Metadata.Path)
+       if err != nil {
+               return nil, err
+       }
+       defer fs.Close()
+
+       rc, err := fs.OpenRead(ctx, f.Metadata.Path)
+       if err != nil {
+               return nil, err
+       }
+
+       comp := f.Compression
+       if comp == compressionAuto {
+               comp = compressionFromExt(f.Metadata.Path)
+       }
+
+       return newDecompressionReader(rc, comp)
+}
+
+// compressionFromExt detects the compression of a file based on its 
extension. If the extension is
+// not recognized, compressionUncompressed is returned.
+func compressionFromExt(path string) compressionType {
+       switch filepath.Ext(path) {
+       case ".gz":
+               return compressionGzip
+       default:
+               return compressionUncompressed
+       }
+}
+
+// newDecompressionReader returns an io.ReadCloser that can be used to read 
uncompressed data from
+// reader, based on the specified compression. If the compression is 
compressionAuto, a non-nil
+// error is returned. It is the caller's responsibility to close the returned 
reader.
+func newDecompressionReader(
+       reader io.ReadCloser,
+       compression compressionType,
+) (io.ReadCloser, error) {
+       switch compression {
+       case compressionAuto:
+               return nil, errors.New(
+                       "compression must be resolved into a concrete type 
before obtaining a reader",
+               )
+       case compressionGzip:
+               return newGzipReader(reader)
+       default:
+               return reader, nil
+       }
+}
+
+// Read reads the entire file into memory and returns the contents.
+func (f ReadableFile) Read(ctx context.Context) (data []byte, err error) {
+       rc, err := f.Open(ctx)
+       if err != nil {
+               return nil, err
+       }
+
+       defer func() {
+               closeErr := rc.Close()
+               if err != nil {
+                       if closeErr != nil {
+                               log.Errorf(ctx, "error closing reader: %v", 
closeErr)
+                       }
+                       return
+               }
+               err = closeErr
+       }()
+
+       return io.ReadAll(rc)
+}
+
+// ReadString reads the entire file into memory and returns the contents as a 
string.
+func (f ReadableFile) ReadString(ctx context.Context) (string, error) {
+       data, err := f.Read(ctx)
+       if err != nil {
+               return "", err
+       }
+
+       return string(data), nil
+}
diff --git a/sdks/go/pkg/beam/io/fileio/file_test.go 
b/sdks/go/pkg/beam/io/fileio/file_test.go
new file mode 100644
index 00000000000..5f7d292387c
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fileio/file_test.go
@@ -0,0 +1,254 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fileio
+
+import (
+       "bytes"
+       "context"
+       "path/filepath"
+       "testing"
+       "testing/iotest"
+)
+
+func TestReadableFile_Open(t *testing.T) {
+       dir := t.TempDir()
+       write(t, filepath.Join(dir, "file1.txt"), []byte("test1"))
+       writeGzip(t, filepath.Join(dir, "file2.gz"), []byte("test2"))
+
+       tests := []struct {
+               name string
+               file ReadableFile
+               want []byte
+       }{
+               {
+                       name: "Open uncompressed file",
+                       file: ReadableFile{
+                               Metadata: FileMetadata{
+                                       Path: filepath.Join(dir, "file1.txt"),
+                               },
+                               Compression: compressionUncompressed,
+                       },
+                       want: []byte("test1"),
+               },
+               {
+                       name: "Open file with auto-detection of compression",
+                       file: ReadableFile{
+                               Metadata: FileMetadata{
+                                       Path: filepath.Join(dir, "file2.gz"),
+                               },
+                               Compression: compressionAuto,
+                       },
+                       want: []byte("test2"),
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       ctx := context.Background()
+
+                       rc, err := tt.file.Open(ctx)
+                       if err != nil {
+                               t.Fatalf("Open() error = %v, want nil", err)
+                       }
+
+                       t.Cleanup(func() {
+                               rc.Close()
+                       })
+
+                       if err := iotest.TestReader(rc, tt.want); err != nil {
+                               t.Errorf("TestReader() error = %v, want nil", 
err)
+                       }
+               })
+       }
+}
+
+func Test_compressionFromExt(t *testing.T) {
+       tests := []struct {
+               name string
+               path string
+               want compressionType
+       }{
+               {
+                       name: "compressionGzip for gz extension",
+                       path: "file.gz",
+                       want: compressionGzip,
+               },
+               {
+                       name: "compressionUncompressed for no extension",
+                       path: "file",
+                       want: compressionUncompressed,
+               },
+               {
+                       name: "compressionUncompressed for unrecognized 
extension",
+                       path: "file.unknown",
+                       want: compressionUncompressed,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       if got := compressionFromExt(tt.path); got != tt.want {
+                               t.Errorf("compressionFromExt() = %v, want %v", 
got, tt.want)
+                       }
+               })
+       }
+}
+
+func Test_newDecompressionReader(t *testing.T) {
+       dir := t.TempDir()
+       write(t, filepath.Join(dir, "file1.txt"), []byte("test1"))
+       writeGzip(t, filepath.Join(dir, "file2.gz"), []byte("test2"))
+
+       tests := []struct {
+               name    string
+               path    string
+               comp    compressionType
+               want    []byte
+               wantErr bool
+       }{
+               {
+                       name: "Reader for uncompressed file",
+                       path: filepath.Join(dir, "file1.txt"),
+                       comp: compressionUncompressed,
+                       want: []byte("test1"),
+               },
+               {
+                       name: "Reader for gzip compressed file",
+                       path: filepath.Join(dir, "file2.gz"),
+                       comp: compressionGzip,
+                       want: []byte("test2"),
+               },
+               {
+                       name:    "Error - reader for auto compression not 
supported",
+                       path:    filepath.Join(dir, "file2.gz"),
+                       comp:    compressionAuto,
+                       wantErr: true,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       rc := openFile(t, tt.path)
+
+                       dr, err := newDecompressionReader(rc, tt.comp)
+                       if (err != nil) != tt.wantErr {
+                               t.Fatalf("newDecompressionReader() error = %v, 
wantErr %v", err, tt.wantErr)
+                       }
+                       if tt.wantErr {
+                               return
+                       }
+
+                       t.Cleanup(func() {
+                               dr.Close()
+                       })
+
+                       if err := iotest.TestReader(dr, tt.want); err != nil {
+                               t.Errorf("TestReader() error = %v, want nil", 
err)
+                       }
+               })
+       }
+}
+
+func TestReadableFile_Read(t *testing.T) {
+       dir := t.TempDir()
+       write(t, filepath.Join(dir, "file1.txt"), []byte("test1"))
+       writeGzip(t, filepath.Join(dir, "file2.gz"), []byte("test2"))
+
+       tests := []struct {
+               name string
+               file ReadableFile
+               want []byte
+       }{
+               {
+                       name: "Read contents from uncompressed file",
+                       file: ReadableFile{
+                               Metadata: FileMetadata{
+                                       Path: filepath.Join(dir, "file1.txt"),
+                               },
+                               Compression: compressionUncompressed,
+                       },
+                       want: []byte("test1"),
+               },
+               {
+                       name: "Read contents from gzip compressed file",
+                       file: ReadableFile{
+                               Metadata: FileMetadata{
+                                       Path: filepath.Join(dir, "file2.gz"),
+                               },
+                               Compression: compressionGzip,
+                       },
+                       want: []byte("test2"),
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       ctx := context.Background()
+
+                       got, err := tt.file.Read(ctx)
+                       if err != nil {
+                               t.Fatalf("Read() error = %v, want nil", err)
+                       }
+
+                       if !bytes.Equal(got, tt.want) {
+                               t.Errorf("Read() got = %v, want %v", got, 
tt.want)
+                       }
+               })
+       }
+}
+
+func TestReadableFile_ReadString(t *testing.T) {
+       dir := t.TempDir()
+       write(t, filepath.Join(dir, "file1.txt"), []byte("test1"))
+       writeGzip(t, filepath.Join(dir, "file2.gz"), []byte("test2"))
+
+       tests := []struct {
+               name string
+               file ReadableFile
+               want string
+       }{
+               {
+                       name: "Read contents from uncompressed file as string",
+                       file: ReadableFile{
+                               Metadata: FileMetadata{
+                                       Path: filepath.Join(dir, "file1.txt"),
+                               },
+                               Compression: compressionUncompressed,
+                       },
+                       want: "test1",
+               },
+               {
+                       name: "Read contents from gzip compressed file as 
string",
+                       file: ReadableFile{
+                               Metadata: FileMetadata{
+                                       Path: filepath.Join(dir, "file2.gz"),
+                               },
+                               Compression: compressionGzip,
+                       },
+                       want: "test2",
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       ctx := context.Background()
+
+                       got, err := tt.file.ReadString(ctx)
+                       if err != nil {
+                               t.Fatalf("ReadString() error = %v, want nil", 
err)
+                       }
+
+                       if got != tt.want {
+                               t.Errorf("ReadString() got = %v, want %v", got, 
tt.want)
+                       }
+               })
+       }
+}
diff --git a/sdks/go/pkg/beam/io/fileio/gzip.go 
b/sdks/go/pkg/beam/io/fileio/gzip.go
new file mode 100644
index 00000000000..5a63d9be7d3
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fileio/gzip.go
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fileio
+
+import (
+       "compress/gzip"
+       "context"
+       "io"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
+)
+
+// gzipReader is a wrapper around a gzip.Reader that also closes the 
underlying io.ReadCloser.
+type gzipReader struct {
+       rc io.ReadCloser
+       zr *gzip.Reader
+}
+
+// newGzipReader creates a new gzipReader from an io.ReadCloser.
+func newGzipReader(rc io.ReadCloser) (*gzipReader, error) {
+       zr, err := gzip.NewReader(rc)
+       if err != nil {
+               return nil, err
+       }
+       return &gzipReader{rc: rc, zr: zr}, nil
+}
+
+// Read reads from the gzip reader.
+func (r *gzipReader) Read(p []byte) (int, error) {
+       return r.zr.Read(p)
+}
+
+// Close closes the gzip reader and the underlying io.ReadCloser.
+func (r *gzipReader) Close() (err error) {
+       defer func() {
+               rcErr := r.rc.Close()
+               if err != nil {
+                       if rcErr != nil {
+                               log.Errorf(context.Background(), "error closing 
reader: %v", rcErr)
+                       }
+                       return
+               }
+               err = rcErr
+       }()
+
+       return r.zr.Close()
+}
diff --git a/sdks/go/pkg/beam/io/fileio/helper_test.go 
b/sdks/go/pkg/beam/io/fileio/helper_test.go
new file mode 100644
index 00000000000..e8b61ef067f
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fileio/helper_test.go
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fileio
+
+import (
+       "bufio"
+       "compress/gzip"
+       "io"
+       "os"
+       "path/filepath"
+       "testing"
+)
+
+// openFile opens a file for reading.
+func openFile(t *testing.T, path string) io.ReadCloser {
+       t.Helper()
+
+       f, err := os.Open(path)
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       return f
+}
+
+// createFile creates a file and parent directories if needed.
+func createFile(t *testing.T, path string) *os.File {
+       t.Helper()
+
+       dir := filepath.Dir(path)
+       if err := os.MkdirAll(dir, 0750); err != nil && !os.IsExist(err) {
+               t.Fatal(err)
+       }
+
+       file, err := os.Create(path)
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       return file
+}
+
+// write writes data to a file.
+func write(t *testing.T, path string, data []byte) {
+       t.Helper()
+
+       f := createFile(t, path)
+       defer f.Close()
+
+       bw := bufio.NewWriter(f)
+       if _, err := bw.Write(data); err != nil {
+               t.Fatal(err)
+       }
+
+       if err := bw.Flush(); err != nil {
+               t.Fatal(err)
+       }
+}
+
+// writeGzip compresses and writes data to a file using gzip.
+func writeGzip(t *testing.T, path string, data []byte) {
+       t.Helper()
+
+       f := createFile(t, path)
+       defer f.Close()
+
+       zw := gzip.NewWriter(f)
+       if _, err := zw.Write(data); err != nil {
+               t.Fatal(err)
+       }
+
+       if err := zw.Close(); err != nil {
+               t.Fatal(err)
+       }
+}
diff --git a/sdks/go/pkg/beam/io/fileio/match.go 
b/sdks/go/pkg/beam/io/fileio/match.go
new file mode 100644
index 00000000000..0c8470c61b6
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fileio/match.go
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fileio provides transforms for matching and reading files.
+package fileio
+
+import (
+       "context"
+       "fmt"
+       "strings"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+       register.DoFn3x1[context.Context, string, func(FileMetadata), 
error](&matchFn{})
+       register.Emitter1[FileMetadata]()
+}
+
+// emptyTreatment controls how empty matches of a pattern are treated.
+type emptyTreatment int
+
+const (
+       // emptyAllow allows empty matches.
+       emptyAllow emptyTreatment = iota
+       // emptyDisallow disallows empty matches.
+       emptyDisallow
+       // emptyAllowIfWildcard allows empty matches if the pattern contains a 
wildcard.
+       emptyAllowIfWildcard
+)
+
+type matchOption struct {
+       EmptyTreatment emptyTreatment
+}
+
+// MatchOptionFn is a function that can be passed to MatchFiles or MatchAll to 
configure options for
+// matching files.
+type MatchOptionFn func(*matchOption)
+
+// MatchEmptyAllowIfWildcard specifies that empty matches are allowed if the 
pattern contains a
+// wildcard.
+func MatchEmptyAllowIfWildcard() MatchOptionFn {
+       return func(o *matchOption) {
+               o.EmptyTreatment = emptyAllowIfWildcard
+       }
+}
+
+// MatchEmptyAllow specifies that empty matches are allowed.
+func MatchEmptyAllow() MatchOptionFn {
+       return func(o *matchOption) {
+               o.EmptyTreatment = emptyAllow
+       }
+}
+
+// MatchEmptyDisallow specifies that empty matches are not allowed.
+func MatchEmptyDisallow() MatchOptionFn {
+       return func(o *matchOption) {
+               o.EmptyTreatment = emptyDisallow
+       }
+}
+
+// MatchFiles finds all files matching the glob pattern and returns a 
PCollection<FileMetadata> of
+// the matching files. MatchFiles accepts a variadic number of MatchOptionFn 
that can be used to
+// configure the treatment of empty matches. By default, empty matches are 
allowed if the pattern
+// contains a wildcard.
+func MatchFiles(s beam.Scope, glob string, opts ...MatchOptionFn) 
beam.PCollection {
+       s = s.Scope("fileio.MatchFiles")
+
+       filesystem.ValidateScheme(glob)
+       return MatchAll(s, beam.Create(s, glob), opts...)
+}
+
+// MatchAll finds all files matching the glob patterns given by the incoming 
PCollection<string> and
+// returns a PCollection<FileMetadata> of the matching files. MatchAll accepts 
a variadic number of
+// MatchOptionFn that can be used to configure the treatment of empty matches. 
By default, empty
+// matches are allowed if the pattern contains a wildcard.
+func MatchAll(s beam.Scope, col beam.PCollection, opts ...MatchOptionFn) 
beam.PCollection {
+       s = s.Scope("fileio.MatchAll")
+
+       option := &matchOption{
+               EmptyTreatment: emptyAllowIfWildcard,
+       }
+
+       for _, opt := range opts {
+               opt(option)
+       }
+
+       return beam.ParDo(s, newMatchFn(option), col)
+}
+
+type matchFn struct {
+       EmptyTreatment emptyTreatment
+}
+
+func newMatchFn(option *matchOption) *matchFn {
+       return &matchFn{
+               EmptyTreatment: option.EmptyTreatment,
+       }
+}
+
+func (fn *matchFn) ProcessElement(
+       ctx context.Context,
+       glob string,
+       emit func(FileMetadata),
+) error {
+       if strings.TrimSpace(glob) == "" {
+               return nil
+       }
+
+       fs, err := filesystem.New(ctx, glob)
+       if err != nil {
+               return err
+       }
+       defer fs.Close()
+
+       files, err := fs.List(ctx, glob)
+       if err != nil {
+               return err
+       }
+
+       if len(files) == 0 {
+               if !allowEmptyMatch(glob, fn.EmptyTreatment) {
+                       return fmt.Errorf("no files matching pattern %q", glob)
+               }
+               return nil
+       }
+
+       metadata, err := metadataFromFiles(ctx, fs, files)
+       if err != nil {
+               return err
+       }
+
+       for _, md := range metadata {
+               emit(md)
+       }
+
+       return nil
+}
+
+func allowEmptyMatch(glob string, treatment emptyTreatment) bool {
+       switch treatment {
+       case emptyDisallow:
+               return false
+       case emptyAllowIfWildcard:
+               return strings.Contains(glob, "*")
+       default:
+               return true
+       }
+}
+
+func metadataFromFiles(
+       ctx context.Context,
+       fs filesystem.Interface,
+       files []string,
+) ([]FileMetadata, error) {
+       if len(files) == 0 {
+               return nil, nil
+       }
+
+       metadata := make([]FileMetadata, len(files))
+
+       for i, path := range files {
+               size, err := fs.Size(ctx, path)
+               if err != nil {
+                       return nil, err
+               }
+
+               metadata[i] = FileMetadata{
+                       Path: path,
+                       Size: size,
+               }
+       }
+
+       return metadata, nil
+}
diff --git a/sdks/go/pkg/beam/io/fileio/match_test.go 
b/sdks/go/pkg/beam/io/fileio/match_test.go
new file mode 100644
index 00000000000..57b2d8cfe1c
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fileio/match_test.go
@@ -0,0 +1,285 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fileio
+
+import (
+       "context"
+       "path/filepath"
+       "testing"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/local"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+       "github.com/google/go-cmp/cmp"
+)
+
+type testFile struct {
+       filename string
+       data     []byte
+}
+
+var testFiles = []testFile{
+       {
+               filename: "file1.txt",
+               data:     []byte("test1"),
+       },
+       {
+               filename: "file2.txt",
+               data:     []byte(""),
+       },
+       {
+               filename: "file3.csv",
+               data:     []byte("test3"),
+       },
+}
+
+func TestMatchFiles(t *testing.T) {
+       dir := t.TempDir()
+       testDir := filepath.Join(dir, "testdata")
+
+       for _, tf := range testFiles {
+               write(t, filepath.Join(testDir, tf.filename), tf.data)
+       }
+
+       tests := []struct {
+               name string
+               glob string
+               opts []MatchOptionFn
+               want []any
+       }{
+               {
+                       name: "Match files",
+                       glob: filepath.Join(dir, "*", "file*.txt"),
+                       want: []any{
+                               FileMetadata{
+                                       Path: filepath.Join(testDir, 
"file1.txt"),
+                                       Size: 5,
+                               },
+                               FileMetadata{
+                                       Path: filepath.Join(testDir, 
"file2.txt"),
+                                       Size: 0,
+                               },
+                       },
+               },
+               {
+                       name: "Read matches with specified empty match 
treatment",
+                       opts: []MatchOptionFn{
+                               MatchEmptyAllow(),
+                       },
+                       glob: filepath.Join(dir, "non-existent.txt"),
+                       want: nil,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       p, s := beam.NewPipelineWithRoot()
+
+                       got := MatchFiles(s, tt.glob, tt.opts...)
+
+                       passert.Equals(s, got, tt.want...)
+                       ptest.RunAndValidate(t, p)
+               })
+       }
+}
+
+func TestMatchAll(t *testing.T) {
+       dir := t.TempDir()
+       testDir := filepath.Join(dir, "testdata")
+
+       for _, tf := range testFiles {
+               write(t, filepath.Join(testDir, tf.filename), tf.data)
+       }
+
+       tests := []struct {
+               name    string
+               opts    []MatchOptionFn
+               input   []any
+               want    []any
+               wantErr bool
+       }{
+               {
+                       name: "Match all",
+                       input: []any{
+                               filepath.Join(dir, "*", "file*.txt"),
+                               filepath.Join(dir, "*", "file*.csv"),
+                       },
+                       want: []any{
+                               FileMetadata{
+                                       Path: filepath.Join(testDir, 
"file1.txt"),
+                                       Size: 5,
+                               },
+                               FileMetadata{
+                                       Path: filepath.Join(testDir, 
"file2.txt"),
+                                       Size: 0,
+                               },
+                               FileMetadata{
+                                       Path: filepath.Join(testDir, 
"file3.csv"),
+                                       Size: 5,
+                               },
+                       },
+               },
+               {
+                       name: "No matches",
+                       input: []any{
+                               filepath.Join(dir, "*", "non-existent.txt"),
+                       },
+                       want: nil,
+               },
+               {
+                       name:  "No matches for empty glob",
+                       input: []any{""},
+                       want:  nil,
+               },
+               {
+                       name: "No matches for glob without wildcard and empty 
matches allowed",
+                       opts: []MatchOptionFn{
+                               MatchEmptyAllow(),
+                       },
+                       input: []any{
+                               filepath.Join(dir, "non-existent.txt"),
+                       },
+                       want: nil,
+               },
+               {
+                       name: "Error - no matches for glob without wildcard",
+                       input: []any{
+                               filepath.Join(dir, "non-existent.txt"),
+                       },
+                       wantErr: true,
+               },
+               {
+                       name: "Error - no matches and empty matches disallowed",
+                       opts: []MatchOptionFn{
+                               MatchEmptyDisallow(),
+                       },
+                       input: []any{
+                               filepath.Join(dir, "*", "non-existent.txt"),
+                       },
+                       wantErr: true,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       p, s := beam.NewPipelineWithRoot()
+
+                       col := beam.Create(s, tt.input...)
+                       got := MatchAll(s, col, tt.opts...)
+
+                       passert.Equals(s, got, tt.want...)
+                       if err := ptest.Run(p); (err != nil) != tt.wantErr {
+                               t.Errorf("MatchAll() error = %v, wantErr %v", 
err, tt.wantErr)
+                       }
+               })
+       }
+}
+
+func Test_allowEmptyMatch(t *testing.T) {
+       tests := []struct {
+               name      string
+               glob      string
+               treatment emptyTreatment
+               want      bool
+       }{
+               {
+                       name:      "Allow for emptyAllow",
+                       glob:      "path/to/file.txt",
+                       treatment: emptyAllow,
+                       want:      true,
+               },
+               {
+                       name:      "Disallow for emptyDisallow",
+                       glob:      "path/to/file.txt",
+                       treatment: emptyDisallow,
+                       want:      false,
+               },
+               {
+                       name:      "Allow for glob with wildcard and 
emptyAllowIfWildcard",
+                       glob:      "path/to/*.txt",
+                       treatment: emptyAllowIfWildcard,
+                       want:      true,
+               },
+               {
+                       name:      "Disallow for glob without wildcard and 
emptyAllowIfWildcard",
+                       glob:      "path/to/file.txt",
+                       treatment: emptyAllowIfWildcard,
+                       want:      false,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       if got := allowEmptyMatch(tt.glob, tt.treatment); got 
!= tt.want {
+                               t.Errorf("allowEmptyMatch() = %v, want %v", 
got, tt.want)
+                       }
+               })
+       }
+}
+
+func Test_metadataFromFiles(t *testing.T) {
+       dir := t.TempDir()
+       files := make([]string, len(testFiles))
+
+       for i, tf := range testFiles {
+               file := filepath.Join(dir, tf.filename)
+               write(t, file, tf.data)
+               files[i] = file
+       }
+
+       tests := []struct {
+               name  string
+               files []string
+               want  []FileMetadata
+       }{
+               {
+                       name:  "Slice of FileMetadata from file paths",
+                       files: files,
+                       want: []FileMetadata{
+                               {
+                                       Path: filepath.Join(dir, "file1.txt"),
+                                       Size: 5,
+                               },
+                               {
+                                       Path: filepath.Join(dir, "file2.txt"),
+                                       Size: 0,
+                               },
+                               {
+                                       Path: filepath.Join(dir, "file3.csv"),
+                                       Size: 5,
+                               },
+                       },
+               },
+               {
+                       name:  "Nil when files is empty",
+                       files: nil,
+                       want:  nil,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       ctx := context.Background()
+                       fs := local.New(ctx)
+
+                       got, err := metadataFromFiles(ctx, fs, tt.files)
+                       if err != nil {
+                               t.Fatalf("metadataFromFiles() error = %v, want 
nil", err)
+                       }
+
+                       if !cmp.Equal(got, tt.want) {
+                               t.Errorf("metadataFromFiles() got = %v, want 
%v", got, tt.want)
+                       }
+               })
+       }
+}
diff --git a/sdks/go/pkg/beam/io/fileio/read.go 
b/sdks/go/pkg/beam/io/fileio/read.go
new file mode 100644
index 00000000000..b06f6a2a6f9
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fileio/read.go
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fileio
+
+import (
+       "fmt"
+       "strings"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+func init() {
+       register.DoFn2x1[FileMetadata, func(ReadableFile), error](&readFn{})
+       register.Emitter1[ReadableFile]()
+}
+
+// directoryTreatment controls how paths to directories are treated when 
reading matches.
+type directoryTreatment int
+
+const (
+       // directorySkip skips directories.
+       directorySkip directoryTreatment = iota
+       // directoryDisallow disallows directories.
+       directoryDisallow
+)
+
+type readOption struct {
+       Compression        compressionType
+       DirectoryTreatment directoryTreatment
+}
+
+// ReadOptionFn is a function that can be passed to ReadMatches to configure 
options for
+// reading files.
+type ReadOptionFn func(*readOption)
+
+// ReadAutoCompression specifies that the compression type of files should be 
auto-detected.
+func ReadAutoCompression() ReadOptionFn {
+       return func(o *readOption) {
+               o.Compression = compressionAuto
+       }
+}
+
+// ReadGzip specifies that files have been compressed using gzip.
+func ReadGzip() ReadOptionFn {
+       return func(o *readOption) {
+               o.Compression = compressionGzip
+       }
+}
+
+// ReadUncompressed specifies that files have not been compressed.
+func ReadUncompressed() ReadOptionFn {
+       return func(o *readOption) {
+               o.Compression = compressionUncompressed
+       }
+}
+
+// ReadDirectorySkip specifies that directories are skipped.
+func ReadDirectorySkip() ReadOptionFn {
+       return func(o *readOption) {
+               o.DirectoryTreatment = directorySkip
+       }
+}
+
+// ReadDirectoryDisallow specifies that directories are not allowed.
+func ReadDirectoryDisallow() ReadOptionFn {
+       return func(o *readOption) {
+               o.DirectoryTreatment = directoryDisallow
+       }
+}
+
+// ReadMatches accepts the result of MatchFiles or MatchAll as a 
PCollection<FileMetadata> and
+// converts it to a PCollection<ReadableFile>. The ReadableFile can be used to 
retrieve file
+// metadata, open the file for reading or read the entire file into memory. 
ReadMatches accepts a
+// variadic number of ReadOptionFn that can be used to configure the 
compression type of the files
+// and treatment of directories. By default, the compression type is 
determined by the file
+// extension and directories are skipped.
+func ReadMatches(s beam.Scope, col beam.PCollection, opts ...ReadOptionFn) 
beam.PCollection {
+       s = s.Scope("fileio.ReadMatches")
+
+       option := &readOption{
+               Compression:        compressionAuto,
+               DirectoryTreatment: directorySkip,
+       }
+
+       for _, opt := range opts {
+               opt(option)
+       }
+
+       return beam.ParDo(s, newReadFn(option), col)
+}
+
+type readFn struct {
+       Compression        compressionType
+       DirectoryTreatment directoryTreatment
+}
+
+func newReadFn(option *readOption) *readFn {
+       return &readFn{
+               Compression:        option.Compression,
+               DirectoryTreatment: option.DirectoryTreatment,
+       }
+}
+
+func (fn *readFn) ProcessElement(metadata FileMetadata, emit 
func(ReadableFile)) error {
+       if isDirectory(metadata.Path) {
+               if fn.DirectoryTreatment == directoryDisallow {
+                       return fmt.Errorf("path to directory not allowed: %q", 
metadata.Path)
+               }
+               return nil
+       }
+
+       file := ReadableFile{
+               Metadata:    metadata,
+               Compression: fn.Compression,
+       }
+
+       emit(file)
+       return nil
+}
+
+func isDirectory(path string) bool {
+       if strings.HasSuffix(path, "/") || strings.HasSuffix(path, "\\") {
+               return true
+       }
+       return false
+}
diff --git a/sdks/go/pkg/beam/io/fileio/read_test.go 
b/sdks/go/pkg/beam/io/fileio/read_test.go
new file mode 100644
index 00000000000..b961d324f9a
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fileio/read_test.go
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fileio
+
+import (
+       "testing"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+)
+
+func TestReadMatches(t *testing.T) {
+       tests := []struct {
+               name    string
+               opts    []ReadOptionFn
+               input   []any
+               want    []any
+               wantErr bool
+       }{
+               {
+                       name: "Read matches",
+                       input: []any{
+                               FileMetadata{
+                                       Path: "file1.txt",
+                                       Size: 5,
+                               },
+                               FileMetadata{
+                                       Path: "file2.txt",
+                                       Size: 0,
+                               },
+                       },
+                       want: []any{
+                               ReadableFile{
+                                       Metadata: FileMetadata{
+                                               Path: "file1.txt",
+                                               Size: 5,
+                                       },
+                                       Compression: compressionAuto,
+                               },
+                               ReadableFile{
+                                       Metadata: FileMetadata{
+                                               Path: "file2.txt",
+                                               Size: 0,
+                                       },
+                                       Compression: compressionAuto,
+                               },
+                       },
+               },
+               {
+                       name: "Read matches with specified compression",
+                       opts: []ReadOptionFn{
+                               ReadGzip(),
+                       },
+                       input: []any{
+                               FileMetadata{
+                                       Path: "file1",
+                                       Size: 5,
+                               },
+                               FileMetadata{
+                                       Path: "file2",
+                                       Size: 0,
+                               },
+                       },
+                       want: []any{
+                               ReadableFile{
+                                       Metadata: FileMetadata{
+                                               Path: "file1",
+                                               Size: 5,
+                                       },
+                                       Compression: compressionGzip,
+                               },
+                               ReadableFile{
+                                       Metadata: FileMetadata{
+                                               Path: "file2",
+                                               Size: 0,
+                                       },
+                                       Compression: compressionGzip,
+                               },
+                       },
+               },
+               {
+                       name: "Read matches and skip directories",
+                       input: []any{
+                               FileMetadata{
+                                       Path: "dir/",
+                                       Size: 0,
+                               },
+                               FileMetadata{
+                                       Path: "file1.txt",
+                                       Size: 5,
+                               },
+                       },
+                       want: []any{
+                               ReadableFile{
+                                       Metadata: FileMetadata{
+                                               Path: "file1.txt",
+                                               Size: 5,
+                                       },
+                                       Compression: compressionAuto,
+                               },
+                       },
+               },
+               {
+                       name: "Error - directories disallowed",
+                       opts: []ReadOptionFn{
+                               ReadDirectoryDisallow(),
+                       },
+                       input: []any{
+                               FileMetadata{
+                                       Path: "dir/",
+                                       Size: 0,
+                               },
+                               FileMetadata{
+                                       Path: "file1.txt",
+                                       Size: 5,
+                               },
+                       },
+                       wantErr: true,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       t.Run(tt.name, func(t *testing.T) {
+                               p, s := beam.NewPipelineWithRoot()
+
+                               col := beam.Create(s, tt.input...)
+                               got := ReadMatches(s, col, tt.opts...)
+
+                               passert.Equals(s, got, tt.want...)
+                               if err := ptest.Run(p); (err != nil) != 
tt.wantErr {
+                                       t.Errorf("ReadMatches() error = %v, 
wantErr %v", err, tt.wantErr)
+                               }
+                       })
+               })
+       }
+}
+
+func Test_isDirectory(t *testing.T) {
+       tests := []struct {
+               name string
+               path string
+               want bool
+       }{
+               {
+                       name: "Path to directory with forward slash directory 
separator",
+                       path: "path/to/",
+                       want: true,
+               },
+               {
+                       name: "Path to directory with backslash directory 
separator",
+                       path: "path\\to\\",
+                       want: true,
+               },
+               {
+                       name: "Path to file",
+                       path: "path/to/file",
+                       want: false,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       if got := isDirectory(tt.path); got != tt.want {
+                               t.Errorf("isDirectory() = %v, want %v", got, 
tt.want)
+                       }
+               })
+       }
+}


Reply via email to