This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 05eba4cfc86 Enable fileio.MatchContinuously to emit duplicate file if 
modified (#26524)
05eba4cfc86 is described below

commit 05eba4cfc86785f18ee4b7625bbb2f564afef95b
Author: Johanna Öjeling <51084516+johannaojel...@users.noreply.github.com>
AuthorDate: Tue May 16 00:13:54 2023 +0200

    Enable fileio.MatchContinuously to emit duplicate file if modified (#26524)
---
 sdks/go/pkg/beam/io/fileio/file.go        |  9 ++-
 sdks/go/pkg/beam/io/fileio/helper_test.go | 12 ++++
 sdks/go/pkg/beam/io/fileio/match.go       | 96 +++++++++++++++++++++++++++----
 sdks/go/pkg/beam/io/fileio/match_test.go  | 51 ++++++++++------
 4 files changed, 139 insertions(+), 29 deletions(-)

diff --git a/sdks/go/pkg/beam/io/fileio/file.go 
b/sdks/go/pkg/beam/io/fileio/file.go
index 4ae7b3d3d07..fc7f64ff4f7 100644
--- a/sdks/go/pkg/beam/io/fileio/file.go
+++ b/sdks/go/pkg/beam/io/fileio/file.go
@@ -21,6 +21,7 @@ import (
        "io"
        "path/filepath"
        "reflect"
+       "time"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
@@ -32,10 +33,12 @@ func init() {
        beam.RegisterType(reflect.TypeOf((*ReadableFile)(nil)).Elem())
 }
 
-// FileMetadata contains metadata about a file, namely its path and size in 
bytes.
+// FileMetadata contains metadata about a file, namely its path, size in bytes 
and last modified
+// time.
 type FileMetadata struct {
-       Path string
-       Size int64
+       Path         string
+       Size         int64
+       LastModified time.Time
 }
 
 // compressionType is the type of compression used to compress a file.
diff --git a/sdks/go/pkg/beam/io/fileio/helper_test.go 
b/sdks/go/pkg/beam/io/fileio/helper_test.go
index e8b61ef067f..08029759593 100644
--- a/sdks/go/pkg/beam/io/fileio/helper_test.go
+++ b/sdks/go/pkg/beam/io/fileio/helper_test.go
@@ -22,6 +22,7 @@ import (
        "os"
        "path/filepath"
        "testing"
+       "time"
 )
 
 // openFile opens a file for reading.
@@ -86,3 +87,14 @@ func writeGzip(t *testing.T, path string, data []byte) {
                t.Fatal(err)
        }
 }
+
+func modTime(t *testing.T, path string) time.Time {
+       t.Helper()
+
+       info, err := os.Stat(path)
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       return info.ModTime()
+}
diff --git a/sdks/go/pkg/beam/io/fileio/match.go 
b/sdks/go/pkg/beam/io/fileio/match.go
index aeee887f7a1..dbd4b215025 100644
--- a/sdks/go/pkg/beam/io/fileio/match.go
+++ b/sdks/go/pkg/beam/io/fileio/match.go
@@ -27,6 +27,7 @@ import (
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic"
 )
@@ -37,6 +38,9 @@ func init() {
        register.DoFn4x1[state.Provider, string, FileMetadata, 
func(FileMetadata), error](
                &dedupFn{},
        )
+       register.DoFn4x1[state.Provider, string, FileMetadata, 
func(FileMetadata), error](
+               &dedupUnmodifiedFn{},
+       )
        register.Emitter1[FileMetadata]()
        register.Emitter1[string]()
        register.Function1x2[FileMetadata, string, FileMetadata](keyByPath)
@@ -190,21 +194,45 @@ func metadataFromFiles(
                        return nil, err
                }
 
+               mTime, err := lastModified(ctx, fs, path)
+               if err != nil {
+                       return nil, err
+               }
+
                metadata[i] = FileMetadata{
-                       Path: path,
-                       Size: size,
+                       Path:         path,
+                       Size:         size,
+                       LastModified: mTime,
                }
        }
 
        return metadata, nil
 }
 
+func lastModified(ctx context.Context, fs filesystem.Interface, path string) 
(time.Time, error) {
+       lmGetter, ok := fs.(filesystem.LastModifiedGetter)
+       if !ok {
+               log.Warnf(ctx, "Filesystem %T does not implement 
filesystem.LastModifiedGetter", fs)
+               return time.Time{}, nil
+       }
+
+       mTime, err := lmGetter.LastModified(ctx, path)
+       if err != nil {
+               return time.Time{}, fmt.Errorf("error getting last modified 
time for %q: %v", path, err)
+       }
+
+       return mTime, nil
+}
+
 // duplicateTreatment controls how duplicate matches are treated.
 type duplicateTreatment int
 
 const (
        // duplicateAllow allows duplicate matches.
        duplicateAllow duplicateTreatment = iota
+       // duplicateAllowIfModified allows duplicate matches only if the file 
has been modified since it
+       // was last observed.
+       duplicateAllowIfModified
        // duplicateSkip skips duplicate matches.
        duplicateSkip
 )
@@ -241,6 +269,14 @@ func MatchDuplicateAllow() MatchContOptionFn {
        }
 }
 
+// MatchDuplicateAllowIfModified specifies that file path matches will be 
deduplicated unless the
+// file has been modified since it was last observed.
+func MatchDuplicateAllowIfModified() MatchContOptionFn {
+       return func(o *matchContOption) {
+               o.DuplicateTreatment = duplicateAllowIfModified
+       }
+}
+
 // MatchDuplicateSkip specifies that file path matches will be deduplicated.
 func MatchDuplicateSkip() MatchContOptionFn {
        return func(o *matchContOption) {
@@ -262,6 +298,8 @@ func MatchApplyWindow() MatchContOptionFn {
 //   - Start: start time for matching files. Defaults to the current timestamp
 //   - End: end time for matching files. Defaults to the maximum timestamp
 //   - DuplicateAllow: allow emitting matches that have already been observed. 
Defaults to false
+//   - DuplicateAllowIfModified: allow emitting matches that have already been 
observed if the file
+//     has been modified since the last observation. Defaults to false
 //   - DuplicateSkip: skip emitting matches that have already been observed. 
Defaults to true
 //   - ApplyWindow: assign each element to an individual window with a fixed 
size equivalent to the
 //     interval. Defaults to false, i.e. all elements will reside in the 
global window
@@ -290,14 +328,7 @@ func MatchContinuously(
        globs := beam.ParDo(s, &matchContFn{Glob: glob}, imp)
        matches := MatchAll(s, globs, MatchEmptyAllow())
 
-       var out beam.PCollection
-
-       if option.DuplicateTreatment == duplicateAllow {
-               out = matches
-       } else {
-               keyed := beam.ParDo(s, keyByPath, matches)
-               out = beam.ParDo(s, &dedupFn{}, keyed)
-       }
+       out := dedupIfRequired(s, matches, option.DuplicateTreatment)
 
        if option.ApplyWindow {
                return beam.WindowInto(s, window.NewFixedWindows(interval), out)
@@ -305,6 +336,24 @@ func MatchContinuously(
        return out
 }
 
+func dedupIfRequired(
+       s beam.Scope,
+       col beam.PCollection,
+       treatment duplicateTreatment,
+) beam.PCollection {
+       if treatment == duplicateAllow {
+               return col
+       }
+
+       keyed := beam.ParDo(s, keyByPath, col)
+
+       if treatment == duplicateAllowIfModified {
+               return beam.ParDo(s, &dedupUnmodifiedFn{}, keyed)
+       }
+
+       return beam.ParDo(s, &dedupFn{}, keyed)
+}
+
 type matchContFn struct {
        Glob string
 }
@@ -341,3 +390,30 @@ func (fn *dedupFn) ProcessElement(
 
        return nil
 }
+
+type dedupUnmodifiedFn struct {
+       State state.Value[int64]
+}
+
+func (fn *dedupUnmodifiedFn) ProcessElement(
+       sp state.Provider,
+       _ string,
+       md FileMetadata,
+       emit func(FileMetadata),
+) error {
+       prevMTime, ok, err := fn.State.Read(sp)
+       if err != nil {
+               return fmt.Errorf("error reading state: %v", err)
+       }
+
+       mTime := md.LastModified.UnixMilli()
+
+       if !ok || mTime > prevMTime {
+               emit(md)
+               if err := fn.State.Write(sp, mTime); err != nil {
+                       return fmt.Errorf("error writing state: %v", err)
+               }
+       }
+
+       return nil
+}
diff --git a/sdks/go/pkg/beam/io/fileio/match_test.go 
b/sdks/go/pkg/beam/io/fileio/match_test.go
index 57b2d8cfe1c..5bc849e5057 100644
--- a/sdks/go/pkg/beam/io/fileio/match_test.go
+++ b/sdks/go/pkg/beam/io/fileio/match_test.go
@@ -55,6 +55,9 @@ func TestMatchFiles(t *testing.T) {
                write(t, filepath.Join(testDir, tf.filename), tf.data)
        }
 
+       fp1 := filepath.Join(testDir, "file1.txt")
+       fp2 := filepath.Join(testDir, "file2.txt")
+
        tests := []struct {
                name string
                glob string
@@ -66,12 +69,14 @@ func TestMatchFiles(t *testing.T) {
                        glob: filepath.Join(dir, "*", "file*.txt"),
                        want: []any{
                                FileMetadata{
-                                       Path: filepath.Join(testDir, 
"file1.txt"),
-                                       Size: 5,
+                                       Path:         fp1,
+                                       Size:         5,
+                                       LastModified: modTime(t, fp1),
                                },
                                FileMetadata{
-                                       Path: filepath.Join(testDir, 
"file2.txt"),
-                                       Size: 0,
+                                       Path:         fp2,
+                                       Size:         0,
+                                       LastModified: modTime(t, fp2),
                                },
                        },
                },
@@ -104,6 +109,10 @@ func TestMatchAll(t *testing.T) {
                write(t, filepath.Join(testDir, tf.filename), tf.data)
        }
 
+       fp1 := filepath.Join(testDir, "file1.txt")
+       fp2 := filepath.Join(testDir, "file2.txt")
+       fp3 := filepath.Join(testDir, "file3.csv")
+
        tests := []struct {
                name    string
                opts    []MatchOptionFn
@@ -119,16 +128,19 @@ func TestMatchAll(t *testing.T) {
                        },
                        want: []any{
                                FileMetadata{
-                                       Path: filepath.Join(testDir, 
"file1.txt"),
-                                       Size: 5,
+                                       Path:         fp1,
+                                       Size:         5,
+                                       LastModified: modTime(t, fp1),
                                },
                                FileMetadata{
-                                       Path: filepath.Join(testDir, 
"file2.txt"),
-                                       Size: 0,
+                                       Path:         fp2,
+                                       Size:         0,
+                                       LastModified: modTime(t, fp2),
                                },
                                FileMetadata{
-                                       Path: filepath.Join(testDir, 
"file3.csv"),
-                                       Size: 5,
+                                       Path:         fp3,
+                                       Size:         5,
+                                       LastModified: modTime(t, fp3),
                                },
                        },
                },
@@ -238,6 +250,10 @@ func Test_metadataFromFiles(t *testing.T) {
                files[i] = file
        }
 
+       fp1 := filepath.Join(dir, "file1.txt")
+       fp2 := filepath.Join(dir, "file2.txt")
+       fp3 := filepath.Join(dir, "file3.csv")
+
        tests := []struct {
                name  string
                files []string
@@ -248,16 +264,19 @@ func Test_metadataFromFiles(t *testing.T) {
                        files: files,
                        want: []FileMetadata{
                                {
-                                       Path: filepath.Join(dir, "file1.txt"),
-                                       Size: 5,
+                                       Path:         fp1,
+                                       Size:         5,
+                                       LastModified: modTime(t, fp1),
                                },
                                {
-                                       Path: filepath.Join(dir, "file2.txt"),
-                                       Size: 0,
+                                       Path:         fp2,
+                                       Size:         0,
+                                       LastModified: modTime(t, fp2),
                                },
                                {
-                                       Path: filepath.Join(dir, "file3.csv"),
-                                       Size: 5,
+                                       Path:         fp3,
+                                       Size:         5,
+                                       LastModified: modTime(t, fp3),
                                },
                        },
                },

Reply via email to