lastomato commented on a change in pull request #12721:
URL: https://github.com/apache/beam/pull/12721#discussion_r482357797
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -122,12 +124,24 @@
* store) This requires each resource to contain a client provided ID. It is
important that when
* using import you give the appropriate permissions to the Google Cloud
Healthcare Service Agent.
*
+ * <p>ExportGcs This is to export FHIR resources from a FHIR store to Google
Cloud Storage. The
Review comment:
I think we should rename this to Export since GCS is an implementation
detail here. Feel free to leave it to a follow-up PR.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -170,6 +184,21 @@
* // Alternatively you could use import for high throughput to a new store.
* FhirIO.Write.Result writeResult =
* output.apply("Import FHIR Resources",
FhirIO.executeBundles(options.getNewFhirStore()));
+ *
+ * // Export FHIR resources to Google Cloud Storage.
+ * String fhirStoreName = ...;
+ * String exportGcsUriPrefix = ...;
+ * FhirIO.ExportGcs.Result exportResult =
Review comment:
Should we return errors with exporting in the `Result`?
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -1212,10 +1280,7 @@ public void executeBundles(ProcessContext context) {
public static class ExportGcs extends PTransform<PBegin, ExportGcs.Result> {
public static final TupleTag<String> OUT = new TupleTag<String>() {};
- /**
- * Represents the result of an export, including both the successful
parsed messages, and
- * invalid ones.
- */
+ /** Represents the result of an export, a collection of FHIR resources in
ndjson. */
Review comment:
nit: ndjson -> newline delimited JSON (ndjson)
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -1297,4 +1362,67 @@ public void exportResourcesToGcs(ProcessContext context)
}
}
}
+
+ /** Deidentify FHIR resources from a FHIR store to a destination FHIR store
*/
+ public static class Deidentify extends PTransform<PBegin,
PCollection<String>> {
+ private final ValueProvider<String> sourceFhirStore;
+ private final ValueProvider<String> destinationFhirStore;
+ private final ValueProvider<DeidentifyConfig> deidConfig;
+
+ public Deidentify(
+ ValueProvider<String> sourceFhirStore,
+ ValueProvider<String> destinationFhirStore,
+ ValueProvider<DeidentifyConfig> deidConfig) {
+ this.sourceFhirStore = sourceFhirStore;
+ this.destinationFhirStore = destinationFhirStore;
+ this.deidConfig = deidConfig;
+ }
+
+ @Override
+ public PCollection<String> expand(PBegin input) {
+ return input
+ .getPipeline()
+ .apply(Create.ofProvider(sourceFhirStore, StringUtf8Coder.of()))
+ .apply(
+ "ScheduleDeidentifyFhirStoreOperations",
+ ParDo.of(new DeidentifyFn(destinationFhirStore, deidConfig)));
+ }
+
+ /** A function that schedules a deidentify operation and monitors the
status. */
+ public static class DeidentifyFn extends DoFn<String, String> {
+
+ private HealthcareApiClient client;
+ private final ValueProvider<String> destinationFhirStore;
+ private final String deidConfigJson;
+
+ public DeidentifyFn(
+ ValueProvider<String> destinationFhirStore,
ValueProvider<DeidentifyConfig> deidConfig) {
+ this.destinationFhirStore = destinationFhirStore;
+ Gson g = new Gson();
+ this.deidConfigJson = g.toJson(deidConfig.get());
+ }
+
+ @Setup
+ public void initClient() throws IOException {
+ this.client = new HttpHealthcareApiClient();
+ }
+
+ @ProcessElement
+ public void deidentify(ProcessContext context)
+ throws IOException, InterruptedException, HealthcareHttpException {
+ String sourceFhirStore = context.element();
+ String destinationFhirStore = this.destinationFhirStore.get();
+ Gson g = new Gson();
Review comment:
Please cache this object, otherwise we will create a `Gson` object for
each element.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
##########
@@ -1297,4 +1362,67 @@ public void exportResourcesToGcs(ProcessContext context)
}
}
}
+
+ /** Deidentify FHIR resources from a FHIR store to a destination FHIR store
*/
+ public static class Deidentify extends PTransform<PBegin,
PCollection<String>> {
+ private final ValueProvider<String> sourceFhirStore;
+ private final ValueProvider<String> destinationFhirStore;
+ private final ValueProvider<DeidentifyConfig> deidConfig;
+
+ public Deidentify(
+ ValueProvider<String> sourceFhirStore,
+ ValueProvider<String> destinationFhirStore,
+ ValueProvider<DeidentifyConfig> deidConfig) {
+ this.sourceFhirStore = sourceFhirStore;
+ this.destinationFhirStore = destinationFhirStore;
+ this.deidConfig = deidConfig;
+ }
+
+ @Override
+ public PCollection<String> expand(PBegin input) {
+ return input
+ .getPipeline()
+ .apply(Create.ofProvider(sourceFhirStore, StringUtf8Coder.of()))
+ .apply(
+ "ScheduleDeidentifyFhirStoreOperations",
+ ParDo.of(new DeidentifyFn(destinationFhirStore, deidConfig)));
+ }
+
+ /** A function that schedules a deidentify operation and monitors the
status. */
+ public static class DeidentifyFn extends DoFn<String, String> {
+
+ private HealthcareApiClient client;
+ private final ValueProvider<String> destinationFhirStore;
+ private final String deidConfigJson;
+
+ public DeidentifyFn(
+ ValueProvider<String> destinationFhirStore,
ValueProvider<DeidentifyConfig> deidConfig) {
+ this.destinationFhirStore = destinationFhirStore;
+ Gson g = new Gson();
+ this.deidConfigJson = g.toJson(deidConfig.get());
Review comment:
Is DeidentifyConfig not serializable?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]