This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new dbe72830b11 Add a test for getting state with MultimapSideInput 
StateKey (#31757)
dbe72830b11 is described below

commit dbe72830b11c2603e062e32295dfb2e3efedbaa9
Author: mls3odp <linju...@google.com>
AuthorDate: Wed Jul 3 11:55:16 2024 -0700

    Add a test for getting state with MultimapSideInput StateKey (#31757)
---
 .../runners/prism/internal/worker/worker_test.go   | 81 +++++++++++++++++++++-
 1 file changed, 80 insertions(+), 1 deletion(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
index e5b03214ae0..469e0e2f3d8 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
@@ -18,13 +18,14 @@ package worker
 import (
        "bytes"
        "context"
-       "github.com/google/go-cmp/cmp"
        "net"
        "sort"
        "sync"
        "testing"
        "time"
 
+       "github.com/google/go-cmp/cmp"
+
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
@@ -386,3 +387,81 @@ func TestWorker_State_MultimapKeysSideInput(t *testing.T) {
                })
        }
 }
+
+func TestWorker_State_MultimapSideInput(t *testing.T) {
+       for _, tt := range []struct {
+               name string
+               w    typex.Window
+       }{
+               {
+                       name: "global window",
+                       w:    window.GlobalWindow{},
+               },
+               {
+                       name: "interval window",
+                       w: window.IntervalWindow{
+                               Start: 1000,
+                               End:   2000,
+                       },
+               },
+       } {
+               t.Run(tt.name, func(t *testing.T) {
+                       var encW []byte
+                       if !tt.w.Equals(window.GlobalWindow{}) {
+                               buf := bytes.Buffer{}
+                               if err := 
exec.MakeWindowEncoder(coder.NewIntervalWindow()).EncodeSingle(tt.w, &buf); err 
!= nil {
+                                       t.Fatalf("error encoding window: %v, 
err: %v", tt.w, err)
+                               }
+                               encW = buf.Bytes()
+                       }
+                       wk, stateStream, done := serveTestWorkerStateStream(t)
+                       defer done()
+                       instID := wk.NextInst()
+                       wk.activeInstructions[instID] = &B{
+                               MultiMapSideInputData: 
map[SideInputKey]map[typex.Window]map[string][][]byte{
+                                       SideInputKey{
+                                               TransformID: "transformID",
+                                               Local:       "i1",
+                                       }: {
+                                               tt.w: map[string][][]byte{"a": 
{{5}}, "b": {{12}}},
+                                       },
+                               },
+                       }
+                       var testKey = []string{"a", "b", "x"}
+                       expectedResult := map[string][]int{
+                               "a": {5},
+                               "b": {12},
+                       }
+                       for _, key := range testKey {
+                               stateStream.Send(&fnpb.StateRequest{
+                                       Id:            "first",
+                                       InstructionId: instID,
+                                       Request: &fnpb.StateRequest_Get{
+                                               Get: &fnpb.StateGetRequest{},
+                                       },
+                                       StateKey: &fnpb.StateKey{Type: 
&fnpb.StateKey_MultimapSideInput_{
+                                               MultimapSideInput: 
&fnpb.StateKey_MultimapSideInput{
+                                                       TransformId: 
"transformID",
+                                                       SideInputId: "i1",
+                                                       Window:      encW,
+                                                       Key:         
[]byte(key),
+                                               },
+                                       }},
+                               })
+
+                               resp, err := stateStream.Recv()
+                               if err != nil {
+                                       t.Fatal("Couldn't receive state 
response:", err)
+                               }
+
+                               var got []int
+                               for _, b := range resp.GetGet().GetData() {
+                                       got = append(got, int(b))
+                               }
+                               if !cmp.Equal(got, expectedResult[key]) {
+                                       t.Errorf("For test key: %v, didn't 
receive expected state response data: got %v, want %v", key, got, 
expectedResult[key])
+                               }
+                       }
+               })
+       }
+}

Reply via email to