This is an automated email from the ASF dual-hosted git repository.

danoliveira pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b9d99405aba Add ExecuteBundles transform to Go FhirIO (#21840)
b9d99405aba is described below

commit b9d99405abaf74889eb9cf05eec4d67ea275a6b4
Author: Lucas Nogueira <lnogu...@uwaterloo.ca>
AuthorDate: Tue Jun 21 20:30:22 2022 -0400

    Add ExecuteBundles transform to Go FhirIO (#21840)
    
    * squashed rebase conflict commits
    
    * add execute bundles transform with integration test
    
    * adjust import to follow convention
    
    * fix variable scope
    
    * adjust read unit tests
    
    * fix bad status test case
    
    * improve read unit test assertions and naming
    
    * add unit tests
    
    * add comment to execute bundles transform
    
    * include error reason
    
    * make variable exported to fix integration test
    
    * adjust integration tests after merge
    
    * update license comment
    
    * unify unit test utilities in single file
    
    * add comment explaining the purpose of unexported transform function
    
    * remove unnecessary generic usage
    
    * remove coded mistakenly added
    
    * improve transaction vs batch bundle comment
    
    * use net/http constants instead of hardcoded values
    
    * return error instead of t.Fail inside helper function
    
    * improve execute bundles DoFn identifier
    
    * adjust import spacing
    
    * improve error message
---
 sdks/go/pkg/beam/io/fhirio/common.go               |  56 ++++++++
 sdks/go/pkg/beam/io/fhirio/execute_bundles.go      | 146 +++++++++++++++++++++
 sdks/go/pkg/beam/io/fhirio/execute_bundles_test.go |  82 ++++++++++++
 sdks/go/pkg/beam/io/fhirio/fakes_test.go           |  39 ------
 sdks/go/pkg/beam/io/fhirio/read.go                 |  45 +++----
 sdks/go/pkg/beam/io/fhirio/read_test.go            |  55 ++------
 sdks/go/pkg/beam/io/fhirio/utils_test.go           | 107 +++++++++++++++
 sdks/go/test/integration/io/fhirio/fhirio_test.go  |  46 +++++--
 8 files changed, 456 insertions(+), 120 deletions(-)

diff --git a/sdks/go/pkg/beam/io/fhirio/common.go 
b/sdks/go/pkg/beam/io/fhirio/common.go
index 98bc10e3306..666f9d6f0e0 100644
--- a/sdks/go/pkg/beam/io/fhirio/common.go
+++ b/sdks/go/pkg/beam/io/fhirio/common.go
@@ -19,10 +19,16 @@
 package fhirio
 
 import (
+       "bytes"
        "context"
+       "io"
        "net/http"
+       "regexp"
+       "time"
 
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
        "google.golang.org/api/healthcare/v1"
        "google.golang.org/api/option"
 )
@@ -32,8 +38,54 @@ const (
        baseMetricPrefix = "fhirio/"
 )
 
+func executeRequestAndRecordLatency(ctx context.Context, latencyMs 
*beam.Distribution, requestSupplier func() (*http.Response, error)) 
(*http.Response, error) {
+       timeBeforeReadRequest := time.Now()
+       response, err := requestSupplier()
+       latencyMs.Update(ctx, time.Since(timeBeforeReadRequest).Milliseconds())
+       return response, err
+}
+
+func extractBodyFrom(response *http.Response) (string, error) {
+       if isBadStatusCode(response.Status) {
+               return "", errors.Errorf("response contains bad status: [%v]", 
response.Status)
+       }
+
+       bodyBytes, err := io.ReadAll(response.Body)
+       if err != nil {
+               return "", err
+       }
+
+       return string(bodyBytes), nil
+}
+
+func isBadStatusCode(status string) bool {
+       // 2XXs are successes, otherwise failure.
+       isMatch, err := regexp.MatchString("^2\\d{2}", status)
+       if err != nil {
+               return true
+       }
+       return !isMatch
+}
+
+type fhirioFnCommon struct {
+       client                fhirStoreClient
+       resourcesErrorCount   beam.Counter
+       resourcesSuccessCount beam.Counter
+       latencyMs             beam.Distribution
+}
+
+func (fnc *fhirioFnCommon) setup(namespace string) {
+       if fnc.client == nil {
+               fnc.client = newFhirStoreClient()
+       }
+       fnc.resourcesErrorCount = beam.NewCounter(namespace, 
baseMetricPrefix+"resource_error_count")
+       fnc.resourcesSuccessCount = beam.NewCounter(namespace, 
baseMetricPrefix+"resource_success_count")
+       fnc.latencyMs = beam.NewDistribution(namespace, 
baseMetricPrefix+"latency_ms")
+}
+
 type fhirStoreClient interface {
        readResource(resourcePath string) (*http.Response, error)
+       executeBundle(storePath string, bundle []byte) (*http.Response, error)
 }
 
 type fhirStoreClientImpl struct {
@@ -51,3 +103,7 @@ func newFhirStoreClient() *fhirStoreClientImpl {
 func (c *fhirStoreClientImpl) readResource(resourcePath string) 
(*http.Response, error) {
        return c.fhirService.Read(resourcePath).Do()
 }
+
+func (c *fhirStoreClientImpl) executeBundle(storePath string, bundle []byte) 
(*http.Response, error) {
+       return c.fhirService.ExecuteBundle(storePath, 
bytes.NewReader(bundle)).Do()
+}
diff --git a/sdks/go/pkg/beam/io/fhirio/execute_bundles.go 
b/sdks/go/pkg/beam/io/fhirio/execute_bundles.go
new file mode 100644
index 00000000000..c96d72fae39
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fhirio/execute_bundles.go
@@ -0,0 +1,146 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package fhirio provides an API for reading and writing resources to Google
+// Cloud Healthcare Fhir stores.
+// Experimental.
+package fhirio
+
+import (
+       "bytes"
+       "context"
+       "encoding/json"
+       "net/http"
+       "strings"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
+)
+
+const (
+       bundleResponseTypeBatch       = "batch-response"
+       bundleResponseTypeTransaction = "transaction-response"
+)
+
+func init() {
+       register.DoFn4x0[context.Context, []byte, func(string), 
func(string)]((*executeBundleFn)(nil))
+       register.Emitter1[string]()
+}
+
+type executeBundleFn struct {
+       fhirioFnCommon
+       successesCount beam.Counter
+       // Path to FHIR store where bundle requests will be executed on.
+       FhirStorePath string
+}
+
+func (fn executeBundleFn) String() string {
+       return "executeBundleFn"
+}
+
+func (fn *executeBundleFn) Setup() {
+       fn.fhirioFnCommon.setup(fn.String())
+       fn.successesCount = beam.NewCounter(fn.String(), 
baseMetricPrefix+"success_count")
+}
+
+func (fn *executeBundleFn) ProcessElement(ctx context.Context, inputBundleBody 
[]byte, emitSuccess, emitFailure func(string)) {
+       response, err := executeRequestAndRecordLatency(ctx, &fn.latencyMs, 
func() (*http.Response, error) {
+               return fn.client.executeBundle(fn.FhirStorePath, 
inputBundleBody)
+       })
+       if err != nil {
+               fn.resourcesErrorCount.Inc(ctx, 1)
+               emitFailure(errors.Wrap(err, "execute bundle request returned 
error").Error())
+               return
+       }
+
+       body, err := extractBodyFrom(response)
+       if err != nil {
+               fn.resourcesErrorCount.Inc(ctx, 1)
+               emitFailure(errors.Wrap(err, "could not extract body from 
execute bundles response").Error())
+               return
+       }
+
+       fn.processResponseBody(ctx, body, emitSuccess, emitFailure)
+}
+
+func (fn *executeBundleFn) processResponseBody(ctx context.Context, body 
string, emitSuccess, emitFailure func(string)) {
+       var bodyFields struct {
+               Type    string        `json:"type"`
+               Entries []interface{} `json:"entry"`
+       }
+
+       err := json.NewDecoder(strings.NewReader(body)).Decode(&bodyFields)
+       if err != nil {
+               fn.resourcesErrorCount.Inc(ctx, 1)
+               emitFailure(errors.Wrap(err, "could not parse body from execute 
bundle response").Error())
+               return
+       }
+
+       if bodyFields.Entries == nil {
+               return
+       }
+
+       // A BATCH bundle returns a success response even if entries have 
failures, as
+       // entries are executed separately. However, TRANSACTION bundles should 
return
+       // error response (in client.executeBundle call) if any entry fails. 
Therefore,
+       // for BATCH bundles we need to parse the error and success counters.
+       switch bodyFields.Type {
+       case bundleResponseTypeTransaction:
+               fn.resourcesSuccessCount.Inc(ctx, 
int64(len(bodyFields.Entries)))
+               emitSuccess(body)
+       case bundleResponseTypeBatch:
+               for _, entry := range bodyFields.Entries {
+                       var entryFields struct {
+                               Response struct {
+                                       Status string `json:"status"`
+                               } `json:"response"`
+                       }
+                       entryBytes, _ := json.Marshal(entry)
+                       _ = 
json.NewDecoder(bytes.NewReader(entryBytes)).Decode(&entryFields)
+                       if entryFields.Response.Status == "" {
+                               continue
+                       }
+
+                       if isBadStatusCode(entryFields.Response.Status) {
+                               fn.resourcesErrorCount.Inc(ctx, 1)
+                               emitFailure(errors.Errorf("execute bundles 
entry contains bad status: [%v]", entryFields.Response.Status).Error())
+                       } else {
+                               fn.resourcesSuccessCount.Inc(ctx, 1)
+                               emitSuccess(string(entryBytes))
+                       }
+               }
+       }
+
+       fn.successesCount.Inc(ctx, 1)
+}
+
+// ExecuteBundles performs all the requests in the specified bundles on a given
+// FHIR store. This transform takes a path to a FHIR store and a PCollection of
+// bundles as JSON-encoded strings. It executes the requests defined on the
+// bundles on the FHIR store located on the provided path. It outputs two
+// PCollection<string>, the first containing the response bodies of the
+// successfully performed requests and the second one error messages of the
+// requests that failed to be executed.
+// See: 
https://cloud.google.com/healthcare-api/docs/samples/healthcare-fhir-execute-bundle
+func ExecuteBundles(s beam.Scope, fhirStorePath string, bundles 
beam.PCollection) (beam.PCollection, beam.PCollection) {
+       s = s.Scope("fhirio.ExecuteBundles")
+       return executeBundles(s, fhirStorePath, bundles, nil)
+}
+
+// This is useful as an entry point for testing because we can provide a fake 
FHIR store client.
+func executeBundles(s beam.Scope, fhirStorePath string, bundles 
beam.PCollection, client fhirStoreClient) (beam.PCollection, beam.PCollection) {
+       return beam.ParDo2(s, &executeBundleFn{fhirioFnCommon: 
fhirioFnCommon{client: client}, FhirStorePath: fhirStorePath}, bundles)
+}
diff --git a/sdks/go/pkg/beam/io/fhirio/execute_bundles_test.go 
b/sdks/go/pkg/beam/io/fhirio/execute_bundles_test.go
new file mode 100644
index 00000000000..c0eb1174bb1
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fhirio/execute_bundles_test.go
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fhirio
+
+import (
+       "bytes"
+       "net/http"
+       "strings"
+       "testing"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+       "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest"
+)
+
+func TestExecuteBundles(t *testing.T) {
+       testCases := []struct {
+               name           string
+               client         fhirStoreClient
+               containedError string
+       }{
+               {
+                       name:           "Execute Bundles request returns error",
+                       client:         requestReturnErrorFakeClient,
+                       containedError: fakeRequestReturnErrorMessage,
+               },
+               {
+                       name:           "Execute Bundles request returns bad 
status",
+                       client:         badStatusFakeClient,
+                       containedError: fakeBadStatus,
+               },
+               {
+                       name:           "Execute Bundles request response body 
fails to be read",
+                       client:         bodyReaderErrorFakeClient,
+                       containedError: fakeBodyReaderErrorMessage,
+               },
+               {
+                       name: "Execute Bundles request response body failed to 
be decoded",
+                       client: &fakeFhirStoreClient{
+                               fakeExecuteBundles: func(storePath string, 
bundle []byte) (*http.Response, error) {
+                                       return &http.Response{
+                                               Body: &fakeReaderCloser{
+                                                       fakeRead: func(t 
[]byte) (int, error) {
+                                                               return 
bytes.NewReader([]byte("")).Read(t)
+                                                       },
+                                               }, Status: "200 Ok"}, nil
+                               },
+                       },
+                       containedError: "EOF",
+               },
+       }
+
+       testBundles := [][]byte{[]byte("foo"), []byte("bar")}
+       for _, testCase := range testCases {
+               t.Run(testCase.name, func(t *testing.T) {
+                       p, s, bundles := ptest.CreateList(testBundles)
+                       successfulBodies, failures := executeBundles(s, "bla", 
bundles, testCase.client)
+                       passert.Empty(s, successfulBodies)
+                       passert.Count(s, failures, "", len(testBundles))
+                       passert.True(s, failures, func(errorMsg string) bool {
+                               return strings.Contains(errorMsg, 
testCase.containedError)
+                       })
+                       pipelineResult := ptest.RunAndValidate(t, p)
+                       err := validateResourceErrorCounter(pipelineResult, 
len(testBundles))
+                       if err != nil {
+                               t.Fatalf("validateResourceErrorCounter returned 
error [%v]", err.Error())
+                       }
+               })
+       }
+}
diff --git a/sdks/go/pkg/beam/io/fhirio/fakes_test.go 
b/sdks/go/pkg/beam/io/fhirio/fakes_test.go
deleted file mode 100644
index 4e1a51aeb23..00000000000
--- a/sdks/go/pkg/beam/io/fhirio/fakes_test.go
+++ /dev/null
@@ -1,39 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package fhirio
-
-import "net/http"
-
-type fakeFhirStoreClient struct {
-       fakeReadResources func(string) (*http.Response, error)
-}
-
-func (c *fakeFhirStoreClient) readResource(resourcePath string) 
(*http.Response, error) {
-       return c.fakeReadResources(resourcePath)
-}
-
-// Useful to fake the Body of a http.Response.
-type fakeReaderCloser struct {
-       fakeRead func([]byte) (int, error)
-}
-
-func (*fakeReaderCloser) Close() error {
-       return nil
-}
-
-func (m *fakeReaderCloser) Read(b []byte) (int, error) {
-       return m.fakeRead(b)
-}
diff --git a/sdks/go/pkg/beam/io/fhirio/read.go 
b/sdks/go/pkg/beam/io/fhirio/read.go
index a710c92a869..41c53a540b6 100644
--- a/sdks/go/pkg/beam/io/fhirio/read.go
+++ b/sdks/go/pkg/beam/io/fhirio/read.go
@@ -20,8 +20,7 @@ package fhirio
 
 import (
        "context"
-       "io"
-       "time"
+       "net/http"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
@@ -34,10 +33,7 @@ func init() {
 }
 
 type readResourceFn struct {
-       client                fhirStoreClient
-       readResourceErrors    beam.Counter
-       readResourceSuccess   beam.Counter
-       readResourceLatencyMs beam.Distribution
+       fhirioFnCommon
 }
 
 func (fn readResourceFn) String() string {
@@ -45,40 +41,28 @@ func (fn readResourceFn) String() string {
 }
 
 func (fn *readResourceFn) Setup() {
-       if fn.client == nil {
-               fn.client = newFhirStoreClient()
-       }
-       fn.readResourceErrors = beam.NewCounter(fn.String(), 
baseMetricPrefix+"read_resource_error_count")
-       fn.readResourceSuccess = beam.NewCounter(fn.String(), 
baseMetricPrefix+"read_resource_success_count")
-       fn.readResourceLatencyMs = beam.NewDistribution(fn.String(), 
baseMetricPrefix+"read_resource_latency_ms")
+       fn.fhirioFnCommon.setup(fn.String())
 }
 
 func (fn *readResourceFn) ProcessElement(ctx context.Context, resourcePath 
string, emitResource, emitDeadLetter func(string)) {
-       timeBeforeReadRequest := time.Now()
-       response, err := fn.client.readResource(resourcePath)
-       fn.readResourceLatencyMs.Update(ctx, 
time.Since(timeBeforeReadRequest).Milliseconds())
-
+       response, err := executeRequestAndRecordLatency(ctx, &fn.latencyMs, 
func() (*http.Response, error) {
+               return fn.client.readResource(resourcePath)
+       })
        if err != nil {
-               fn.readResourceErrors.Inc(ctx, 1)
-               emitDeadLetter(errors.Wrapf(err, "failed fetching resource 
[%s]", resourcePath).Error())
-               return
-       }
-
-       if response.StatusCode != 200 {
-               fn.readResourceErrors.Inc(ctx, 1)
-               emitDeadLetter(errors.Errorf("fetched resource [%s] returned 
bad status [%d]", resourcePath, response.StatusCode).Error())
+               fn.resourcesErrorCount.Inc(ctx, 1)
+               emitDeadLetter(errors.Wrapf(err, "read resource request 
returned error on input: [%v]", resourcePath).Error())
                return
        }
 
-       bytes, err := io.ReadAll(response.Body)
+       body, err := extractBodyFrom(response)
        if err != nil {
-               fn.readResourceErrors.Inc(ctx, 1)
-               emitDeadLetter(errors.Wrapf(err, "error reading response body 
of resource [%s]", resourcePath).Error())
+               fn.resourcesErrorCount.Inc(ctx, 1)
+               emitDeadLetter(errors.Wrapf(err, "could not extract body from 
read resource [%v] response", resourcePath).Error())
                return
        }
 
-       fn.readResourceSuccess.Inc(ctx, 1)
-       emitResource(string(bytes))
+       fn.resourcesSuccessCount.Inc(ctx, 1)
+       emitResource(body)
 }
 
 // Read fetches resources from Google Cloud Healthcare FHIR stores based on the
@@ -93,6 +77,7 @@ func Read(s beam.Scope, resourcePaths beam.PCollection) 
(beam.PCollection, beam.
        return read(s, resourcePaths, nil)
 }
 
+// This is useful as an entry point for testing because we can provide a fake 
FHIR store client.
 func read(s beam.Scope, resourcePaths beam.PCollection, client 
fhirStoreClient) (beam.PCollection, beam.PCollection) {
-       return beam.ParDo2(s, &readResourceFn{client: client}, resourcePaths)
+       return beam.ParDo2(s, &readResourceFn{fhirioFnCommon: 
fhirioFnCommon{client: client}}, resourcePaths)
 }
diff --git a/sdks/go/pkg/beam/io/fhirio/read_test.go 
b/sdks/go/pkg/beam/io/fhirio/read_test.go
index 7b8cba1af8d..39ae484b92b 100644
--- a/sdks/go/pkg/beam/io/fhirio/read_test.go
+++ b/sdks/go/pkg/beam/io/fhirio/read_test.go
@@ -16,8 +16,6 @@
 package fhirio
 
 import (
-       "errors"
-       "net/http"
        "strings"
        "testing"
 
@@ -32,35 +30,19 @@ func TestRead(t *testing.T) {
                containedError string
        }{
                {
-                       name: "Read Request Failed",
-                       client: &fakeFhirStoreClient{
-                               fakeReadResources: func(resource string) 
(*http.Response, error) {
-                                       return nil, errors.New("")
-                               },
-                       },
-                       containedError: "failed fetching resource",
+                       name:           "Read request returns error",
+                       client:         requestReturnErrorFakeClient,
+                       containedError: fakeRequestReturnErrorMessage,
                },
                {
-                       name: "Read Request Returns Bad Status",
-                       client: &fakeFhirStoreClient{
-                               fakeReadResources: func(resource string) 
(*http.Response, error) {
-                                       return &http.Response{StatusCode: 403}, 
nil
-                               },
-                       },
-                       containedError: "returned bad status",
+                       name:           "Read request returns bad status",
+                       client:         badStatusFakeClient,
+                       containedError: fakeBadStatus,
                },
                {
-                       name: "Response body fails to be parsed",
-                       client: &fakeFhirStoreClient{
-                               fakeReadResources: func(resource string) 
(*http.Response, error) {
-                                       return &http.Response{Body: 
&fakeReaderCloser{
-                                               fakeRead: func([]byte) (int, 
error) {
-                                                       return 0, errors.New("")
-                                               },
-                                       }, StatusCode: 200}, nil
-                               },
-                       },
-                       containedError: "error reading response body",
+                       name:           "Read request response body fails to be 
read",
+                       client:         bodyReaderErrorFakeClient,
+                       containedError: fakeBodyReaderErrorMessage,
                },
        }
 
@@ -75,23 +57,10 @@ func TestRead(t *testing.T) {
                                return strings.Contains(errorMsg, 
testCase.containedError)
                        })
                        pipelineResult := ptest.RunAndValidate(t, p)
-                       counterResults := 
pipelineResult.Metrics().AllMetrics().Counters()
-
-                       if len(counterResults) != 1 {
-                               t.Fatalf("counterResults got length %v, 
expected %v", len(counterResults), 1)
-                       }
-                       counterResult := counterResults[0]
-
-                       expectedCounterName := 
"fhirio/read_resource_error_count"
-                       if counterResult.Name() != expectedCounterName {
-                               t.Fatalf("counterResult.Name() is '%v', 
expected '%v'", counterResult.Name(), expectedCounterName)
+                       err := validateResourceErrorCounter(pipelineResult, 
len(testResourcePaths))
+                       if err != nil {
+                               t.Fatalf("validateResourceErrorCounter returned 
error [%v]", err.Error())
                        }
-
-                       expectedCounterResult := int64(len(testResourcePaths))
-                       if counterResult.Result() != expectedCounterResult {
-                               t.Fatalf("counterResult.Result() is %v, 
expected %v", counterResult.Result(), expectedCounterResult)
-                       }
-
                })
        }
 }
diff --git a/sdks/go/pkg/beam/io/fhirio/utils_test.go 
b/sdks/go/pkg/beam/io/fhirio/utils_test.go
new file mode 100644
index 00000000000..a8ffefd207c
--- /dev/null
+++ b/sdks/go/pkg/beam/io/fhirio/utils_test.go
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package fhirio
+
+import (
+       "errors"
+       "fmt"
+       "net/http"
+
+       "github.com/apache/beam/sdks/v2/go/pkg/beam"
+)
+
+var (
+       fakeRequestReturnErrorMessage = "internal error"
+       requestReturnErrorFakeClient  = &fakeFhirStoreClient{
+               fakeReadResources: func(resource string) (*http.Response, 
error) {
+                       return nil, errors.New(fakeRequestReturnErrorMessage)
+               },
+               fakeExecuteBundles: func(storePath string, bundle []byte) 
(*http.Response, error) {
+                       return nil, errors.New(fakeRequestReturnErrorMessage)
+               },
+       }
+
+       fakeBadStatus         = "403 Forbidden"
+       badStatusFakeResponse = &http.Response{Status: fakeBadStatus}
+       badStatusFakeClient   = &fakeFhirStoreClient{
+               fakeReadResources: func(resource string) (*http.Response, 
error) {
+                       return badStatusFakeResponse, nil
+               },
+               fakeExecuteBundles: func(storePath string, bundle []byte) 
(*http.Response, error) {
+                       return badStatusFakeResponse, nil
+               },
+       }
+
+       fakeBodyReaderErrorMessage  = "ReadAll fail"
+       bodyReaderErrorFakeResponse = &http.Response{
+               Body: &fakeReaderCloser{
+                       fakeRead: func([]byte) (int, error) {
+                               return 0, errors.New(fakeBodyReaderErrorMessage)
+                       },
+               }, Status: "200 Ok"}
+       bodyReaderErrorFakeClient = &fakeFhirStoreClient{
+               fakeReadResources: func(resource string) (*http.Response, 
error) {
+                       return bodyReaderErrorFakeResponse, nil
+               },
+               fakeExecuteBundles: func(storePath string, bundle []byte) 
(*http.Response, error) {
+                       return bodyReaderErrorFakeResponse, nil
+               },
+       }
+)
+
+type fakeFhirStoreClient struct {
+       fakeReadResources  func(string) (*http.Response, error)
+       fakeExecuteBundles func(storePath string, bundle []byte) 
(*http.Response, error)
+}
+
+func (c *fakeFhirStoreClient) executeBundle(storePath string, bundle []byte) 
(*http.Response, error) {
+       return c.fakeExecuteBundles(storePath, bundle)
+}
+
+func (c *fakeFhirStoreClient) readResource(resourcePath string) 
(*http.Response, error) {
+       return c.fakeReadResources(resourcePath)
+}
+
+// Useful to fake the Body of a http.Response.
+type fakeReaderCloser struct {
+       fakeRead func([]byte) (int, error)
+}
+
+func (*fakeReaderCloser) Close() error {
+       return nil
+}
+
+func (m *fakeReaderCloser) Read(b []byte) (int, error) {
+       return m.fakeRead(b)
+}
+
+func validateResourceErrorCounter(pipelineResult beam.PipelineResult, 
expectedCount int) error {
+       counterResults := pipelineResult.Metrics().AllMetrics().Counters()
+       if len(counterResults) != 1 {
+               return fmt.Errorf("counterResults got length %v, expected %v", 
len(counterResults), 1)
+       }
+       counterResult := counterResults[0]
+
+       expectedCounterName := "fhirio/resource_error_count"
+       if counterResult.Name() != expectedCounterName {
+               return fmt.Errorf("counterResult.Name() is '%v', expected 
'%v'", counterResult.Name(), expectedCounterName)
+       }
+
+       if counterResult.Result() != int64(expectedCount) {
+               return fmt.Errorf("counterResult.Result() is %v, expected %v", 
counterResult.Result(), expectedCount)
+       }
+       return nil
+}
diff --git a/sdks/go/test/integration/io/fhirio/fhirio_test.go 
b/sdks/go/test/integration/io/fhirio/fhirio_test.go
index 538d1c6b78f..560e74f4522 100644
--- a/sdks/go/test/integration/io/fhirio/fhirio_test.go
+++ b/sdks/go/test/integration/io/fhirio/fhirio_test.go
@@ -24,6 +24,7 @@ import (
        "flag"
        "fmt"
        "math/big"
+       "net/http"
        "os"
        "strconv"
        "strings"
@@ -62,11 +63,20 @@ func checkFlags(t *testing.T) {
        }
 }
 
+func setupFhirStoreWithData(t *testing.T) (string, []string, func()) {
+       return setupFhirStore(t, true)
+}
+
+func setupEmptyFhirStore(t *testing.T) (string, func()) {
+       storePath, _, teardownFunc := setupFhirStore(t, false)
+       return storePath, teardownFunc
+}
+
 // Sets up a test fhir store by creating and populating data to it for testing
 // purposes. It returns the name of the created store path, a slice of the
 // resource paths to be used in tests, and a function to teardown what has been
 // set up.
-func setupFhirStore(t *testing.T) (string, []string, func()) {
+func setupFhirStore(t *testing.T, shouldPopulateStore bool) (string, []string, 
func()) {
        t.Helper()
        if storeService == nil || storeManagementService == nil {
                t.Fatal("Healthcare Services were not initialized")
@@ -75,13 +85,16 @@ func setupFhirStore(t *testing.T) (string, []string, 
func()) {
        healthcareDataset := fmt.Sprintf(datasetPathFmt, *gcpopts.Project, 
*gcpopts.Region)
        createdFhirStore, err := createStore(healthcareDataset)
        if err != nil {
-               t.Fatal("Test store failed to be created")
+               t.Fatalf("Test store failed to be created. Reason: %v", 
err.Error())
        }
        createdFhirStorePath := createdFhirStore.Name
 
-       resourcePaths := populateStore(createdFhirStorePath)
-       if len(resourcePaths) == 0 {
-               t.Fatal("No data got populated to test")
+       var resourcePaths []string
+       if shouldPopulateStore {
+               resourcePaths = populateStore(createdFhirStorePath)
+               if len(resourcePaths) == 0 {
+                       t.Fatal("No data got populated to test")
+               }
        }
 
        return createdFhirStorePath, resourcePaths, func() {
@@ -168,7 +181,7 @@ func TestFhirIO_Read(t *testing.T) {
        integration.CheckFilters(t)
        checkFlags(t)
 
-       _, testResourcePaths, teardownFhirStore := setupFhirStore(t)
+       _, testResourcePaths, teardownFhirStore := setupFhirStoreWithData(t)
        defer teardownFhirStore()
 
        p, s, resourcePaths := ptest.CreateList(testResourcePaths)
@@ -183,7 +196,7 @@ func TestFhirIO_InvalidRead(t *testing.T) {
        integration.CheckFilters(t)
        checkFlags(t)
 
-       fhirStorePath, _, teardownFhirStore := setupFhirStore(t)
+       fhirStorePath, _, teardownFhirStore := setupFhirStoreWithData(t)
        defer teardownFhirStore()
 
        invalidResourcePath := fhirStorePath + "/fhir/Patient/invalid"
@@ -192,12 +205,29 @@ func TestFhirIO_InvalidRead(t *testing.T) {
        passert.Count(s, failedReads, "", 1)
        passert.Empty(s, resources)
        passert.True(s, failedReads, func(errorMsg string) bool {
-               return strings.Contains(errorMsg, "bad status [404]")
+               return strings.Contains(errorMsg, 
strconv.Itoa(http.StatusNotFound))
        })
 
        ptest.RunAndValidate(t, p)
 }
 
+func TestFhirIO_ExecuteBundles(t *testing.T) {
+       integration.CheckFilters(t)
+       checkFlags(t)
+
+       fhirStorePath, teardownFhirStore := setupEmptyFhirStore(t)
+       defer teardownFhirStore()
+
+       p, s, bundles := ptest.CreateList(readPrettyBundles())
+       successBodies, failures := fhirio.ExecuteBundles(s, fhirStorePath, 
bundles)
+       passert.Count(s, successBodies, "", 2)
+       passert.Count(s, failures, "", 2)
+       passert.True(s, failures, func(errorMsg string) bool {
+               return strings.Contains(errorMsg, 
strconv.Itoa(http.StatusBadRequest))
+       })
+       ptest.RunAndValidate(t, p)
+}
+
 func TestMain(m *testing.M) {
        flag.Parse()
        beam.Init()

Reply via email to