This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 2ea61b7 Revert "Merge pull request #14691 from Add PatchResources to FhirIO." new d1cede0 Merge pull request #14816 from [BEAM-12310] Revert "Merge pull request #14691 from Add PatchResources to FhirIO." 2ea61b7 is described below commit 2ea61b722894f56eab15aca1cd428ca8f3c41be6 Author: Boyuan Zhang <boyu...@google.com> AuthorDate: Fri May 14 12:25:18 2021 -0700 Revert "Merge pull request #14691 from Add PatchResources to FhirIO." This reverts commit e37bedea --- .../apache/beam/sdk/io/gcp/healthcare/FhirIO.java | 129 +--------------- .../sdk/io/gcp/healthcare/FhirSearchParameter.java | 15 -- .../sdk/io/gcp/healthcare/HealthcareApiClient.java | 18 +-- .../io/gcp/healthcare/HttpHealthcareApiClient.java | 63 +------- .../beam/sdk/io/gcp/healthcare/FhirIOPatchIT.java | 171 --------------------- .../beam/sdk/io/gcp/healthcare/FhirIOTest.java | 23 --- 6 files changed, 9 insertions(+), 410 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index 7fd44ce..5bc73ce 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -28,7 +28,6 @@ import com.google.auto.value.AutoValue; import com.google.gson.Gson; import com.google.gson.JsonArray; import java.io.IOException; -import java.io.Serializable; import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; @@ -59,7 +58,6 @@ import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions; import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.io.fs.ResourceIdCoder; -import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.PatchResources.Input; import org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.HealthcareHttpException; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.metrics.Counter; @@ -67,8 +65,6 @@ import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; -import org.apache.beam.sdk.schemas.AutoValueSchema; -import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupIntoBatches; @@ -105,19 +101,19 @@ import org.slf4j.LoggerFactory; * <h3>Reading</h3> * * <p>FHIR resources can be read with {@link FhirIO.Read}, which supports use cases where you have a - * ${@link PCollection} of FHIR resource names in the format of projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}. This is appropriate for reading the Fhir notifications from + * ${@link PCollection} of message IDs. This is appropriate for reading the Fhir notifications from * a Pub/Sub subscription with {@link PubsubIO#readStrings()} or in cases where you have a manually * prepared list of messages that you need to process (e.g. in a text file read with {@link * org.apache.beam.sdk.io.TextIO}*) . * - * <p>Fetch Resource contents from Fhir Store based on the {@link PCollection} of FHIR resource name strings + * <p>Fetch Resource contents from Fhir Store based on the {@link PCollection} of message ID strings * {@link FhirIO.Read.Result} where one can call {@link Read.Result#getResources()} to retrieve a - * {@link PCollection} containing the successfully fetched json resources as {@link String}s and/or {@link + * {@link PCollection} containing the successfully fetched {@link String}s and/or {@link * FhirIO.Read.Result#getFailedReads()}* to retrieve a {@link PCollection} of {@link - * HealthcareIOError}* containing the resources that could not be fetched and the exception as a + * HealthcareIOError}* containing the resource ID that could not be fetched and the exception as a * {@link HealthcareIOError}, this can be used to write to the dead letter storage system of your * choosing. This error handling is mainly to transparently surface errors where the upstream {@link - * PCollection}* contains FHIR resources that are not valid or are not reachable due to permissions issues. + * PCollection}* contains IDs that are not valid or are not reachable due to permissions issues. * * <h3>Writing</h3> * @@ -387,16 +383,6 @@ public class FhirIO { } /** - * Patch FHIR resources, @see <a - * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/patch></a>. - * - * @return the patch - */ - public static PatchResources patchResources() { - return new PatchResources(); - } - - /** * Increments success and failure counters for an LRO. To be used after the LRO has completed. * This function leverages the fact that the LRO metadata is always of the format: "counter": { * "success": "1", "failure": "1" } @@ -1344,7 +1330,6 @@ public class FhirIO { /** The type Execute bundles. */ public static class ExecuteBundles extends PTransform<PCollection<String>, Write.Result> { - private final ValueProvider<String> fhirStore; /** @@ -1432,109 +1417,8 @@ public class FhirIO { } } - /** The type Patch resources. */ - public static class PatchResources extends PTransform<PCollection<Input>, Write.Result> { - - private PatchResources() {} - - /** Represents the input parameters for a single FHIR patch request. */ - @DefaultSchema(AutoValueSchema.class) - @AutoValue - abstract static class Input implements Serializable { - abstract String getResourceName(); - - abstract String getPatch(); - - abstract @Nullable Map<String, String> getQuery(); - - static Builder builder() { - return new AutoValue_FhirIO_PatchResources_Input.Builder(); - } - - @AutoValue.Builder - abstract static class Builder { - abstract Builder setResourceName(String resourceName); - - abstract Builder setPatch(String patch); - - abstract Builder setQuery(Map<String, String> query); - - abstract Input build(); - } - } - - @Override - public FhirIO.Write.Result expand(PCollection<Input> input) { - int numShards = 10; - int batchSize = 10000; - PCollectionTuple bodies = - // Shard input into batches to improve worker performance. - input - .apply( - "Shard input", - WithKeys.of(elm -> ThreadLocalRandom.current().nextInt(0, numShards))) - .setCoder(KvCoder.of(TextualIntegerCoder.of(), input.getCoder())) - .apply("Assemble batches", GroupIntoBatches.ofSize(batchSize)) - .setCoder(KvCoder.of(TextualIntegerCoder.of(), IterableCoder.of(input.getCoder()))) - .apply( - ParDo.of(new PatchResourcesFn()) - .withOutputTags(Write.SUCCESSFUL_BODY, TupleTagList.of(Write.FAILED_BODY))); - bodies.get(Write.SUCCESSFUL_BODY).setCoder(StringUtf8Coder.of()); - bodies.get(Write.FAILED_BODY).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of())); - return Write.Result.in(input.getPipeline(), bodies); - } - - /** The type Write Fhir fn. */ - static class PatchResourcesFn extends DoFn<KV<Integer, Iterable<Input>>, String> { - - private static final Counter PATCH_RESOURCES_ERRORS = - Metrics.counter( - PatchResourcesFn.class, BASE_METRIC_PREFIX + "patch_resources_error_count"); - private static final Counter PATCH_RESOURCES_SUCCESS = - Metrics.counter( - PatchResourcesFn.class, BASE_METRIC_PREFIX + "patch_resources_success_count"); - private static final Distribution PATCH_RESOURCES_LATENCY_MS = - Metrics.distribution( - PatchResourcesFn.class, BASE_METRIC_PREFIX + "patch_resources_latency_ms"); - - private transient HealthcareApiClient client; - private final ObjectMapper mapper = new ObjectMapper(); - - /** - * Initialize healthcare client. - * - * @throws IOException If the Healthcare client cannot be created. - */ - @Setup - public void initClient() throws IOException { - this.client = new HttpHealthcareApiClient(); - } - - @ProcessElement - public void patchResources(ProcessContext context) { - Iterable<Input> batch = context.element().getValue(); - for (Input patchParameter : batch) { - try { - long startTime = Instant.now().toEpochMilli(); - client.patchFhirResource( - patchParameter.getResourceName(), - patchParameter.getPatch(), - patchParameter.getQuery()); - PATCH_RESOURCES_LATENCY_MS.update(Instant.now().toEpochMilli() - startTime); - PATCH_RESOURCES_SUCCESS.inc(); - context.output(Write.SUCCESSFUL_BODY, patchParameter.toString()); - } catch (IOException | HealthcareHttpException e) { - PATCH_RESOURCES_ERRORS.inc(); - context.output(Write.FAILED_BODY, HealthcareIOError.of(patchParameter.toString(), e)); - } - } - } - } - } - /** Export FHIR resources from a FHIR store to new line delimited json files on GCS. */ public static class Export extends PTransform<PBegin, PCollection<String>> { - private final ValueProvider<String> fhirStore; private final ValueProvider<String> exportGcsUriPrefix; @@ -1597,7 +1481,6 @@ public class FhirIO { /** Deidentify FHIR resources from a FHIR store to a destination FHIR store. */ public static class Deidentify extends PTransform<PBegin, PCollection<String>> { - private final ValueProvider<String> sourceFhirStore; private final ValueProvider<String> destinationFhirStore; private final ValueProvider<DeidentifyConfig> deidConfig; @@ -1668,7 +1551,6 @@ public class FhirIO { /** The type Search. */ public static class Search<T> extends PTransform<PCollection<FhirSearchParameter<T>>, FhirIO.Search.Result> { - private static final Logger LOG = LoggerFactory.getLogger(Search.class); private final ValueProvider<String> fhirStore; @@ -1682,7 +1564,6 @@ public class FhirIO { } public static class Result implements POutput, PInput { - private PCollection<KV<String, JsonArray>> keyedResources; private PCollection<JsonArray> resources; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java index db34d33..6f80d86 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java @@ -46,26 +46,11 @@ public class FhirSearchParameter<T> { this.queries = queries; } - /** - * Creates a FhirSearchParameter to represent a FHIR Search request. - * - * @param resourceType resource type for search, leave empty for all - * @param key optional key to index searches by - * @param queries search query, with field as key and search as value - * @return FhirSearchParameter - */ public static <T> FhirSearchParameter<T> of( String resourceType, @Nullable String key, @Nullable Map<String, T> queries) { return new FhirSearchParameter<>(resourceType, key, queries); } - /** - * Creates a FhirSearchParameter to represent a FHIR Search request. - * - * @param resourceType resource type for search, leave empty for all - * @param queries search query, with field as key and search as value - * @return FhirSearchParameter - */ public static <T> FhirSearchParameter<T> of( String resourceType, @Nullable Map<String, T> queries) { return new FhirSearchParameter<>(resourceType, null, queries); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java index 81b2a75..f4e38c4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java @@ -162,27 +162,13 @@ public interface HealthcareApiClient { throws IOException, HealthcareHttpException; /** - * Patch fhir resource http body. - * - * @param resourceName the resource name, in format - * projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}[/{id}], id not - * present when queryString is specified. - * @param patch the patch operation - * @param query optional query for conditional patches - * @return the http body - */ - HttpBody patchFhirResource(String resourceName, String patch, @Nullable Map<String, String> query) - throws IOException, HealthcareHttpException; - - /** * Read fhir resource http body. * - * @param resourceName the resource name, in format - * projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id} + * @param resourceId the resource * @return the http body * @throws IOException the io exception */ - HttpBody readFhirResource(String resourceName) throws IOException; + HttpBody readFhirResource(String resourceId) throws IOException; /** * Search fhir resource http body. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java index 929fe2a..283d011 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java @@ -100,7 +100,6 @@ import org.slf4j.LoggerFactory; "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) }) public class HttpHealthcareApiClient implements HealthcareApiClient, Serializable { - private static final String USER_AGENT = String.format( "apache-beam-io-google-cloud-platform-healthcare/%s", @@ -108,7 +107,6 @@ public class HttpHealthcareApiClient implements HealthcareApiClient, Serializabl private static final String FHIRSTORE_HEADER_CONTENT_TYPE = "application/fhir+json"; private static final String FHIRSTORE_HEADER_ACCEPT = "application/fhir+json; charset=utf-8"; private static final String FHIRSTORE_HEADER_ACCEPT_CHARSET = "utf-8"; - private static final String FHIRSTORE_PATCH_CONTENT_TYPE = "application/json-patch+json"; private static final Logger LOG = LoggerFactory.getLogger(HttpHealthcareApiClient.class); private transient CloudHealthcare client; private transient HttpClient httpClient; @@ -596,61 +594,11 @@ public class HttpHealthcareApiClient implements HealthcareApiClient, Serializabl return responseModel; } - @Override - public HttpBody patchFhirResource( - String resourceName, String patch, @Nullable Map<String, String> query) - throws IOException, HealthcareHttpException { - if (httpClient == null || client == null) { - initClient(); - } - - credentials.refreshIfExpired(); - StringEntity requestEntity = new StringEntity(patch, ContentType.APPLICATION_JSON); - URI uri; - try { - URIBuilder uriBuilder = new URIBuilder(client.getRootUrl() + "v1beta1/" + resourceName); - if (query != null) { - for (Map.Entry<String, String> q : query.entrySet()) { - uriBuilder.addParameter(q.getKey(), q.getValue()); - } - } - uri = uriBuilder.build(); - } catch (URISyntaxException e) { - LOG.error("URL error when making patch request to FHIR API. " + e.getMessage()); - throw new IllegalArgumentException(e); - } - - RequestBuilder requestBuilder = - RequestBuilder.patch() - .setUri(uri) - .setEntity(requestEntity) - .addHeader("Authorization", "Bearer " + credentials.getAccessToken().getTokenValue()) - .addHeader("User-Agent", USER_AGENT) - .addHeader("Content-Type", FHIRSTORE_PATCH_CONTENT_TYPE) - .addHeader("Accept", FHIRSTORE_HEADER_ACCEPT) - .addHeader("Accept-Charset", FHIRSTORE_HEADER_ACCEPT_CHARSET); - - HttpUriRequest request = requestBuilder.build(); - HttpResponse response = httpClient.execute(request); - HttpEntity responseEntity = response.getEntity(); - String content = EntityUtils.toString(responseEntity); - - // Check 2XX code. - int statusCode = response.getStatusLine().getStatusCode(); - if (!(statusCode / 100 == 2)) { - throw HealthcareHttpException.of(statusCode, content); - } - HttpBody responseModel = new HttpBody(); - responseModel.setData(content); - return responseModel; - } - /** * Wraps {@link HttpResponse} in an exception with a statusCode field for use with {@link * HealthcareIOError}. */ public static class HealthcareHttpException extends Exception { - private final int statusCode; private HealthcareHttpException(int statusCode, String message) { @@ -682,15 +630,8 @@ public class HttpHealthcareApiClient implements HealthcareApiClient, Serializabl } @Override - public HttpBody readFhirResource(String resourceName) throws IOException { - return client - .projects() - .locations() - .datasets() - .fhirStores() - .fhir() - .read(resourceName) - .execute(); + public HttpBody readFhirResource(String resourceId) throws IOException { + return client.projects().locations().datasets().fhirStores().fhir().read(resourceId).execute(); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatchIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatchIT.java deleted file mode 100644 index 835d717..0000000 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatchIT.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.healthcare; - -import static org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.HEALTHCARE_DATASET_TEMPLATE; - -import com.google.api.services.healthcare.v1beta1.model.HttpBody; -import java.io.IOException; -import java.security.SecureRandom; -import java.util.Arrays; -import java.util.Collection; -import org.apache.beam.runners.direct.DirectOptions; -import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.PatchResources.Input; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; - -@RunWith(Parameterized.class) -public class FhirIOPatchIT { - - public String version; - - @Parameters(name = "{0}") - public static Collection<String> versions() { - return Arrays.asList("R4"); - } - - @Rule public transient TestPipeline pipeline = TestPipeline.create(); - - private final String project; - private transient HealthcareApiClient client; - private static String healthcareDataset; - private String fhirStoreId; - private String resourceName; - private static final String BASE_STORE_ID = - "FHIR_store_patch_it_" + System.currentTimeMillis() + "_" + (new SecureRandom().nextInt(32)); - - public FhirIOPatchIT(String version) { - this.version = version; - this.fhirStoreId = BASE_STORE_ID + version; - this.project = - TestPipeline.testingPipelineOptions() - .as(HealthcareStoreTestPipelineOptions.class) - .getStoreProjectId(); - } - - @Before - public void setup() throws Exception { - healthcareDataset = String.format(HEALTHCARE_DATASET_TEMPLATE, project); - if (client == null) { - this.client = new HttpHealthcareApiClient(); - } - client.createFhirStore(healthcareDataset, fhirStoreId, version, ""); - - resourceName = healthcareDataset + "/fhirStores/" + fhirStoreId + "/fhir/Patient/123"; - String bundle = - "{\"resourceType\":\"Bundle\"," - + "\"type\":\"transaction\"," - + "\"entry\": [{" - + "\"request\":{\"method\":\"PUT\",\"url\":\"Patient/123\"}," - + "\"resource\":{\"resourceType\":\"Patient\",\"id\":\"123\",\"birthDate\": \"1990-01-01\"}" - + "}]}"; - FhirIOTestUtil.executeFhirBundles( - client, healthcareDataset + "/fhirStores/" + fhirStoreId, ImmutableList.of(bundle)); - } - - @After - public void teardown() throws IOException { - HealthcareApiClient client = new HttpHealthcareApiClient(); - for (String version : versions()) { - client.deleteFhirStore(healthcareDataset + "/fhirStores/" + BASE_STORE_ID + version); - } - } - - @Test - public void testFhirIOPatch() throws IOException { - pipeline.getOptions().as(DirectOptions.class).setBlockOnRun(false); - - Input patchParameter = - Input.builder() - .setResourceName(resourceName) - .setPatch( - "[{\"op\": \"replace\", \"path\": \"/birthDate\", \"value\": \"1997-05-23\"}]") - .build(); - String expectedSuccessBody = patchParameter.toString(); - - // Execute patch. - PCollection<Input> patches = pipeline.apply(Create.of(patchParameter)); - FhirIO.Write.Result result = patches.apply(FhirIO.patchResources()); - - // Validate beam results. - PAssert.that(result.getFailedBodies()).empty(); - PCollection<String> successfulBodies = result.getSuccessfulBodies(); - PAssert.that(successfulBodies) - .satisfies( - input -> { - for (String body : input) { - Assert.assertEquals(expectedSuccessBody, body); - } - return null; - }); - - pipeline.run().waitUntilFinish(); - - // Validate FHIR store contents. - HttpBody readResult = client.readFhirResource(resourceName); - Assert.assertEquals("1997-05-23", readResult.get("birthDate")); - } - - @Test - public void testFhirIOPatch_ifMatch() throws IOException { - pipeline.getOptions().as(DirectOptions.class).setBlockOnRun(false); - - Input patchParameter = - Input.builder() - .setResourceName(healthcareDataset + "/fhirStores/" + fhirStoreId + "/fhir/Patient") - .setPatch( - "[{\"op\": \"replace\", \"path\": \"/birthDate\", \"value\": \"1997-06-23\"}]") - .setQuery(ImmutableMap.of("birthDate", "1990-01-01")) - .build(); - String expectedSuccessBody = patchParameter.toString(); - - // Execute patch. - PCollection<Input> patches = pipeline.apply(Create.of(patchParameter)); - FhirIO.Write.Result result = patches.apply(FhirIO.patchResources()); - - // Validate beam results. - PAssert.that(result.getFailedBodies()).empty(); - PCollection<String> successfulBodies = result.getSuccessfulBodies(); - PAssert.that(successfulBodies) - .satisfies( - input -> { - for (String body : input) { - Assert.assertEquals(expectedSuccessBody, body); - } - return null; - }); - - pipeline.run().waitUntilFinish(); - - // Validate FHIR store contents. - HttpBody readResult = client.readFhirResource(resourceName); - Assert.assertEquals("1997-06-23", readResult.get("birthDate")); - } -} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java index eecd371..8313a01 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java @@ -23,7 +23,6 @@ import java.util.Collections; import java.util.List; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.PatchResources.Input; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; @@ -32,7 +31,6 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptors; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -135,27 +133,6 @@ public class FhirIOTest { pipeline.run(); } - @Test - public void test_FhirIO_failedPatch() { - Input badPatch = Input.builder().setPatch("").setResourceName("").build(); - PCollection<Input> patches = pipeline.apply(Create.of(ImmutableList.of(badPatch))); - FhirIO.Write.Result patchResult = patches.apply(FhirIO.patchResources()); - - PCollection<HealthcareIOError<String>> failedInserts = patchResult.getFailedBodies(); - - PAssert.thatSingleton(failedInserts) - .satisfies( - (HealthcareIOError<String> err) -> { - Assert.assertEquals(badPatch.toString(), err.getDataResource()); - return null; - }); - PCollection<Long> numFailedInserts = failedInserts.apply(Count.globally()); - - PAssert.thatSingleton(numFailedInserts).isEqualTo(1L); - - pipeline.run(); - } - private static final long NUM_ELEMENTS = 11; private static ArrayList<KV<String, String>> createTestData() {