[ 
https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17096961#comment-17096961
 ] 

Jacob Ferriero edited comment on BEAM-9856 at 4/30/20, 9:59 PM:
----------------------------------------------------------------

The existing GA [HL7v2 Messages.List 
API|https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages/list]
 allows us to specify a filter and order by sendTime. We should be able to use 
this as our restriction dimension to make this a splitable DoFn.

I will investigate feasibility of this.


Basic Design Proposal:
Each Messages.List query will have a sendTime filter based on it's restriction 
and orderBy sendTime in order closely mimic the 
[OffsetRangeTracker|https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.html]
 pattern.

* getInitialRestriction: run a query against HL7v2 store to get earliest 
sendTime 
* RestrictionTracker: keep a "watermark" based on sendTime
* splitRestriction: split timestamp range based on fraction (this will require 
an existing HL7v2 Message List call can be notified to "stop paginating through 
the at an arbitrary sendTime". Note that each List query might not get to the 
end of it's sendTime filter (due to splitting). Instead it should just spot 
processing results when it encounters are record past the end of it's 
restriction.

This should allow us to more eagerly emit results as certain restrictions are 
completed.


Open Questions:
* Is there a canonical way of specifying more than one initial restriction 
based on assumption that you know for most use cases you'll want to split at 
least n-times upfront (e.g. partition by day/hour to start and dynamically 
split from there) ? Is this an anti-pattern because
* Should the initial restriction have a endTime? Should this be an optional 
user parameter for the transform? What should the default be (e.g. 
Instant.now() just before firing the first query)? or Should the ListMessages 
just scroll until it is "caught up" there are no newer messages?


Jake's two cents:
I consider this List Messages transform to primarily serve a batch / bounded 
backfill or replay use case. I believe real-time use cases should use the Pub 
Sub notifications for event driven / streaming updates from the HL7v2 store 
with HL7v2IO.readAll() (as this allows for much greater parallelization and is 
more likely to "keep up" during higher throughput). However, there is nothing 
stopping a user from using ListMessages against a store that is still being 
updated. If this becomes a splittable DoFn that fires many ListMessages 
throughtout it's life time and our initial restriction is [minSendTime,  inf), 
this could become an unbounded source (or more specifically unbounded per 
element). I feel we should force ListMessages to be bounded per element by 
always having a endSendTime.


was (Author: data-runner0):
The existing GA [HL7v2 Messages.List 
API|https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages/list]
 allows us to specify a filter and order by createTime. We should be able to 
use this as our restriction dimension to make this a splitable DoFn.

I will investigate feasibility of this.


Basic Design Proposal:
Each Messages.List query will have a createTime filter based on it's 
restriction and orderBy createTime in order closely mimic the 
[OffsetRangeTracker|https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.html]
 pattern.

* getInitialRestriction: run a query against HL7v2 store to get earliest 
createTime 
* RestrictionTracker: keep a "watermark" based on createTime
* splitRestriction: split timestamp range based on fraction (this will require 
an existing HL7v2 Message List call can be notified to "stop paginating through 
the at an arbitrary createTime". Note that each List query might not get to the 
end of it's createTime filter (due to splitting). Instead it should just spot 
processing results when it encounters are record past the end of it's 
restriction.

This should allow us to more eagerly emit results as certain restrictions are 
completed.


Open Questions:
* Is there a canonical way of specifying more than one initial restriction 
based on assumption that you know for most use cases you'll want to split at 
least n-times upfront (e.g. partition by day/hour to start and dynamically 
split from there) ? Is this an anti-pattern because
* Should the initial restriction have a endTime? Should this be an optional 
user parameter for the transform? What should the default be (e.g. 
Instant.now() just before firing the first query)? or Should the ListMessages 
just scroll until it is "caught up" there are no newer messages?


Jake's two cents:
I consider this List Messages transform to primarily serve a batch / bounded 
backfill or replay use case. I believe real-time use cases should use the Pub 
Sub notifications for event driven / streaming updates from the HL7v2 store 
with HL7v2IO.readAll() (as this allows for much greater parallelization and is 
more likely to "keep up" during higher throughput). However, there is nothing 
stopping a user from using ListMessages against a store that is still being 
updated. If this becomes a splittable DoFn that fires many ListMessages 
throughtout it's life time and our initial restriction is [minCreateTime,  
inf), this could become an unbounded source (or more specifically unbounded per 
element). I feel we should force ListMessages to be bounded per element by 
always having a endCreateTime.

> HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
> ------------------------------------------------------------------------------
>
>                 Key: BEAM-9856
>                 URL: https://issues.apache.org/jira/browse/BEAM-9856
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-gcp
>            Reporter: Jacob Ferriero
>            Assignee: Jacob Ferriero
>            Priority: Minor
>
> Currently the List Messages API paginates through in a single ProcessElement 
> Call.
> However we could get a restriction based on createTime using Messages.List 
> filter and orderby.
>  
> This is inline with the future roadmap of  HL7v2 bulk export API becomes 
> available that should allow splitting on (e.g. create time dimension). 
> Leveraging this bulk export might be  a future optimization to explore.
>  
> This could take one of two forms:
> 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make 
> optimization the runner's problem, potentially unnecessarily complex for this 
> use case )
> 2. static splitting on some time partition e.g. finding the earliest 
> createTime and emitting a PCollection of 1 hour partitions and paginating 
> through each hour of data w/ in the time frame that the store spans, in a 
> separate ProcessElement. (easy to implement but will likely have hot keys / 
> stragglers based on "busy hours")
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to