[jira] [Created] (BEAM-10544) Select Types not equal with nested schema
Jacob Ferriero created BEAM-10544: - Summary: Select Types not equal with nested schema Key: BEAM-10544 URL: https://issues.apache.org/jira/browse/BEAM-10544 Project: Beam Issue Type: Bug Components: dsl-sql, sdk-java-core Reporter: Jacob Ferriero When using SQL transform to join a large nested schema to a flat table getting an error about "Types not equal" from Select [1] We are not able the test of our use of SqlTransform to pass with direct runner. All code is checked into CSR [2]. Things of note: Calcite Query Planner Query (the real business logic was much more complex but this is sufficient to reproduce issue in our test) ```sql SELECT t1.DeviceName AS DeviceName, t1.LinkName AS LinkName, t1.HostName AS HostName, t1.MeasuredAt AS MeasuredAt, t2.b_dBm AS b_dBm FROM RealtimeRows AS t1 INNER JOIN --BigQuery Dimension Side Input TxPowerSideInput AS t2 ON t1.DeviceName = t2.DeviceName ``` Tables created like so (though in real tive ) ```java // This table has the same schema to the real incoming Pub/Sub messages // in the real world use case. PCollection realtimeTestData = pipeline .apply("Read 1Hz staging", BigQueryIO .readTableRowsWithSchema() .fromQuery( "SELECT * FROM `taara-db.jake_views.staging_sample_float`") .usingStandardSql()) .apply(Convert.toRows()); PCollection txPowerCalcRows = pipeline .apply("Read Tx Power Calc Side Input", BigQueryIO .readTableRowsWithSchema() .fromQuery( "SELECT * FROM `taara-db`.MANUFACTURING.tx_power_timeinvariant_calculations") .usingStandardSql()) .apply(Convert.toRows()); ``` Relevant java snippet ```java PCollection out = tables .apply( "Join to dimension Data", SqlTransform .query(sql) .registerUdf("POW", Pow.class) .registerUdf("SQRT", Sqrt.class) .registerUdf("LOG10", Log10.class) .registerUdf("GREATEST", Greatest.class) .registerUdf("EXTRACT_OFFSET", ExtractArrayOffset.class) .registerUdf("PARSE_TIMESTAMP", ParseTimestamp.class) .registerUdf("UNIX_SECONDS", UnixSeconds.class) ); ``` [1] https://github.com/apache/beam/blob/b564239081e9351c56fb0e7d263495b95dd3f8f3/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java#L203 [2] https://source.cloud.google.com/taara-db/pso-taara-realtime-margin/+/master:streaming-join/streaming-join/src/test/java/com/google/x/taara/dataflow/transforms/RxTxPowersCorrFERCombinedSqlTransformIT.java -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10479) UDF / UDAF support for ZetaSQLQueryPlanner
[ https://issues.apache.org/jira/browse/BEAM-10479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero updated BEAM-10479: -- Issue Type: New Feature (was: Test) > UDF / UDAF support for ZetaSQLQueryPlanner > -- > > Key: BEAM-10479 > URL: https://issues.apache.org/jira/browse/BEAM-10479 > Project: Beam > Issue Type: New Feature > Components: dsl-sql-zetasql >Reporter: Jacob Ferriero >Priority: P2 > > [BeamSqlDslUdfUdafTest | > https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java] > should be refactored so each test runs against all supported Query Planners > (namely Calcite and Zeta SQL). > This could be achieved without code duplication by using Parameterized tests > and having each test run with both query planners (and easily support adding > more QueryPlanners in the future if necessary). > {code:java} > import org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner; > import org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner; > @RunWith(Parameterized.class) > public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase { >public QueryPlanner planner; > @Parameters(name = "{0}") > public static Collection> planners(){ > return Arrays.asList( > CalciteQueryPlanner.class, > ZetaSQLQueryPlanner.class) > } > BeamSqlDslUdfUdafTest(Class planner){ > this.planner = planner; > } > // TODO refactor each test that run SqlTransform::query to use > SqlTransform::withQueryPlannerClass(this.planner) > ... > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10479) UDF / UDAF support for ZetaSQLQueryPlanner
[ https://issues.apache.org/jira/browse/BEAM-10479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero updated BEAM-10479: -- Summary: UDF / UDAF support for ZetaSQLQueryPlanner (was: UDF / UDAF support should be tested against ZetaSQLQueryPlanner) > UDF / UDAF support for ZetaSQLQueryPlanner > -- > > Key: BEAM-10479 > URL: https://issues.apache.org/jira/browse/BEAM-10479 > Project: Beam > Issue Type: Test > Components: dsl-sql-zetasql >Reporter: Jacob Ferriero >Priority: P2 > > [BeamSqlDslUdfUdafTest | > https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java] > should be refactored so each test runs against all supported Query Planners > (namely Calcite and Zeta SQL). > This could be achieved without code duplication by using Parameterized tests > and having each test run with both query planners (and easily support adding > more QueryPlanners in the future if necessary). > {code:java} > import org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner; > import org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner; > @RunWith(Parameterized.class) > public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase { >public QueryPlanner planner; > @Parameters(name = "{0}") > public static Collection> planners(){ > return Arrays.asList( > CalciteQueryPlanner.class, > ZetaSQLQueryPlanner.class) > } > BeamSqlDslUdfUdafTest(Class planner){ > this.planner = planner; > } > // TODO refactor each test that run SqlTransform::query to use > SqlTransform::withQueryPlannerClass(this.planner) > ... > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-10488) ZetaSQL OFFSET support for retrieving Array Values
Jacob Ferriero created BEAM-10488: - Summary: ZetaSQL OFFSET support for retrieving Array Values Key: BEAM-10488 URL: https://issues.apache.org/jira/browse/BEAM-10488 Project: Beam Issue Type: Test Components: dsl-sql-zetasql Reporter: Jacob Ferriero Without supporting OFFSET it Arrays are un-parseable for anything but passthing through a simple select with ZetaSQLPlanner. https://github.com/google/zetasql/blob/master/docs/arrays.md#accessing-array-elements -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-10479) UDF / UDAF support should be tested against ZetaSQLQueryPlanner
Jacob Ferriero created BEAM-10479: - Summary: UDF / UDAF support should be tested against ZetaSQLQueryPlanner Key: BEAM-10479 URL: https://issues.apache.org/jira/browse/BEAM-10479 Project: Beam Issue Type: Test Components: dsl-sql-zetasql Reporter: Jacob Ferriero [BeamSqlDslUdfUdafTest | https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java] should be refactored so each test runs against all supported Query Planners (namely Calcite and Zeta SQL). This could be achieved without code duplication by using Parameterized tests and having each test run with both query planners (and easily support adding more QueryPlanners in the future if necessary). {code:java} import org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner; import org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner; @RunWith(Parameterized.class) public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase { public QueryPlanner planner; @Parameters(name = "{0}") public static Collection> planners(){ return Arrays.asList( CalciteQueryPlanner.class, ZetaSQLQueryPlanner.class) } BeamSqlDslUdfUdafTest(Class planner){ this.planner = planner; } // TODO refactor each test that run SqlTransform::query to use SqlTransform::withQueryPlannerClass(this.planner) ... } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-10478) Beam Zeta SQL docs should call out additional dependency
Jacob Ferriero created BEAM-10478: - Summary: Beam Zeta SQL docs should call out additional dependency Key: BEAM-10478 URL: https://issues.apache.org/jira/browse/BEAM-10478 Project: Beam Issue Type: New Feature Components: dsl-sql-zetasql Reporter: Jacob Ferriero The documentation should explain that using beam zeta sql requires both beam-sdks-java-extensions-sql and beam-sdks-java-extensions-sql-zetasql [This docs page | https://beam.apache.org/documentation/dsls/sql/overview/] is a bit misleading as you will get Class Not Found Exception if you try to use setPlannerName without this additional dependency installed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10473) RowJson should support DATETIME
[ https://issues.apache.org/jira/browse/BEAM-10473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero updated BEAM-10473: -- Description: Add support for DATETIME types to support adding an interface to JsonToRow that accepts a joda DateTimeFormatter for parsing DateTime JSON string fields or specifies a behavior for treating numbers as unix timestamps. This is crucial for SQL pipelines processing streaming data with timestamps. The interface might look something lik this: {code:java} /** For parsing JSON string fields containing dates to DATETIME in the target schema */ JsonToRow::withDateTimeFormatter(org.joda.time.format.DateTimeFormatter) /** For parsing JSON number fields containing milliseconds since UNIX epoch that are listed as DATETIME in the target schema*/ JsonToRow::usingUnixMillis() /** For parsing JSON number fields containing seconds since UNIX epoch that are listed as DATETIME in the target schema*/ JsonToRow::usingUnixSeconds() {code} was: Add support for DATETIME types by adding an interface to JsonToRow that accepts a joda DateTimeFormatter for parsing DateTime JSON string fields or specifies a behavior for treating numbers as unix timestamps. The interface might look something lik this: {code:java} /** For parsing JSON string fields containing dates to DATETIME in the target schema */ JsonToRow::withDateTimeFormatter(org.joda.time.format.DateTimeFormatter) /** For parsing JSON number fields containing milliseconds since UNIX epoch that are listed as DATETIME in the target schema*/ JsonToRow::usingUnixMillis() /** For parsing JSON number fields containing seconds since UNIX epoch that are listed as DATETIME in the target schema*/ JsonToRow::usingUnixSeconds() {code} > RowJson should support DATETIME > --- > > Key: BEAM-10473 > URL: https://issues.apache.org/jira/browse/BEAM-10473 > Project: Beam > Issue Type: New Feature > Components: sdk-java-core >Reporter: Jacob Ferriero >Priority: P2 > > Add support for DATETIME types to support adding an > interface to JsonToRow that accepts a joda DateTimeFormatter for parsing > DateTime JSON string fields or specifies a behavior for treating numbers as > unix timestamps. > This is crucial for SQL pipelines processing streaming data with timestamps. > The interface might look something lik this: > {code:java} > /** For parsing JSON string fields containing dates to DATETIME in the target > schema */ > JsonToRow::withDateTimeFormatter(org.joda.time.format.DateTimeFormatter) > /** For parsing JSON number fields containing milliseconds since UNIX epoch > that are listed as DATETIME in the target schema*/ > JsonToRow::usingUnixMillis() > /** For parsing JSON number fields containing seconds since UNIX epoch that > are listed as DATETIME in the target schema*/ > JsonToRow::usingUnixSeconds() > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10473) RowJson should support DATETIME
[ https://issues.apache.org/jira/browse/BEAM-10473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero updated BEAM-10473: -- Summary: RowJson should support DATETIME (was: JsonToRow should support DATETIME) > RowJson should support DATETIME > --- > > Key: BEAM-10473 > URL: https://issues.apache.org/jira/browse/BEAM-10473 > Project: Beam > Issue Type: New Feature > Components: sdk-java-core >Reporter: Jacob Ferriero >Priority: P2 > > Add support for DATETIME types by adding an > interface to JsonToRow that accepts a joda DateTimeFormatter for parsing > DateTime JSON string fields or specifies a behavior for treating numbers as > unix timestamps. > The interface might look something lik this: > {code:java} > /** For parsing JSON string fields containing dates to DATETIME in the target > schema */ > JsonToRow::withDateTimeFormatter(org.joda.time.format.DateTimeFormatter) > /** For parsing JSON number fields containing milliseconds since UNIX epoch > that are listed as DATETIME in the target schema*/ > JsonToRow::usingUnixMillis() > /** For parsing JSON number fields containing seconds since UNIX epoch that > are listed as DATETIME in the target schema*/ > JsonToRow::usingUnixSeconds() > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-10473) JsonToRow should support DATETIME
Jacob Ferriero created BEAM-10473: - Summary: JsonToRow should support DATETIME Key: BEAM-10473 URL: https://issues.apache.org/jira/browse/BEAM-10473 Project: Beam Issue Type: New Feature Components: sdk-java-core Reporter: Jacob Ferriero Add support for DATETIME types by adding an interface to JsonToRow that accepts a joda DateTimeFormatter for parsing DateTime JSON string fields or specifies a behavior for treating numbers as unix timestamps. The interface might look something lik this: {code:java} /** For parsing JSON string fields containing dates to DATETIME in the target schema */ JsonToRow::withDateTimeFormatter(org.joda.time.format.DateTimeFormatter) /** For parsing JSON number fields containing milliseconds since UNIX epoch that are listed as DATETIME in the target schema*/ JsonToRow::usingUnixMillis() /** For parsing JSON number fields containing seconds since UNIX epoch that are listed as DATETIME in the target schema*/ JsonToRow::usingUnixSeconds() {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-10437) Add zeta SQL GREATEST support
Jacob Ferriero created BEAM-10437: - Summary: Add zeta SQL GREATEST support Key: BEAM-10437 URL: https://issues.apache.org/jira/browse/BEAM-10437 Project: Beam Issue Type: New Feature Components: dsl-sql-zetasql Reporter: Jacob Ferriero -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-10436) Add zeta SQL LOG / LOG10 support
Jacob Ferriero created BEAM-10436: - Summary: Add zeta SQL LOG / LOG10 support Key: BEAM-10436 URL: https://issues.apache.org/jira/browse/BEAM-10436 Project: Beam Issue Type: New Feature Components: dsl-sql-zetasql Reporter: Jacob Ferriero -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-10419) Flaky FhirIOWriteIT integraion test in java postcommit
[ https://issues.apache.org/jira/browse/BEAM-10419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153159#comment-17153159 ] Jacob Ferriero commented on BEAM-10419: --- Seems that the logic that moves files to "claim" them for a batch import in FhirIO.Import sometimes is trying to copy a file that has already been deleted. For sake of explanation (and refreshing my own memory) the FhirIO.Import works as follows: # Buffer elements in a bundle to a new line delimited JSON file in the tempDir # Group these files into batches (of 10,000 files) this is to avoid many load jobs if batches were small # Copy each batch of files to a unique sub temp dir lets call it a batchDir (this is used to specify a prefix + wildcard for each load job) in [ImportFn|https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java#L1033-L1040] # Start load job and block til completion (deletes the files in the batchDir [here|https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java#L1051] and the corresponding files in the orignal dir [here|https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java#L1065]) # Delete all the (original) files in the tempDir once the window is closed (based on this [usage of Wait.on|https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java#L840-L870]) Background on why this logic was so complicated in this review [thread|https://github.com/apache/beam/pull/11339#discussion_r423357007] The flakiness seems to occur when attempting the copy in step 3 on a file that does not exists. My hunch is that the deleting in step 5 may not be properly waiting on the ImportFn to be complete. Could this happen if the ImportFn task is rescheduled for a particular batch? FYI I do not have bandwidth to dig deeper into this / fix this right now and [~lastomato] has taken over maintenance of these IO connectors as I am not currently engaged w/ Healthcare customers. {noformat} Caused by: java.io.IOException: Error executing batch GCS request at org.apache.beam.sdk.extensions.gcp.util.GcsUtil.executeBatches(GcsUtil.java:617) at org.apache.beam.sdk.extensions.gcp.util.GcsUtil.copy(GcsUtil.java:718) at org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.copy(GcsFileSystem.java:168) at org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:289) at org.apache.beam.sdk.io.gcp.healthcare.FhirIO$Import$ImportFn.importBatch(FhirIO.java:1040) Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Error trying to rewrite gs://temp-storage-for-healthcare-io-tests/fhirImportBatch-60962352-ef3c-4d42-8079-c17ac430ebd6.ndjson to gs://temp-storage-for-healthcare-io-tests/tmp-22b3fd94-1455-42b0-b24a-55439eff5647/fhirImportBatch-60962352-ef3c-4d42-8079-c17ac430ebd6.ndjson: {"code":404,"errors":[{"domain":"global","message":"No such object: temp-storage-for-healthcare-io-tests/fhirImportBatch-60962352-ef3c-4d42-8079-c17ac430ebd6.ndjson","reason":"notFound"}],"message":"No such object: temp-storage-for-healthcare-io-tests/fhirImportBatch-60962352-ef3c-4d42-8079-c17ac430ebd6.ndjson"} {noformat} > Flaky FhirIOWriteIT integraion test in java postcommit > -- > > Key: BEAM-10419 > URL: https://issues.apache.org/jira/browse/BEAM-10419 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: Yichi Zhang >Assignee: Jacob Ferriero >Priority: P1 > Labels: flake > > See history > https://ci-beam.apache.org/job/beam_PostCommit_Java/6320/testReport/junit/org.apache.beam.sdk.io.gcp.healthcare/FhirIOWriteIT/history/?start=25 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-10394) Add zeta SQL SQRT support
Jacob Ferriero created BEAM-10394: - Summary: Add zeta SQL SQRT support Key: BEAM-10394 URL: https://issues.apache.org/jira/browse/BEAM-10394 Project: Beam Issue Type: New Feature Components: dsl-sql-zetasql Reporter: Jacob Ferriero -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-10393) Add zeta SQL POW support
Jacob Ferriero created BEAM-10393: - Summary: Add zeta SQL POW support Key: BEAM-10393 URL: https://issues.apache.org/jira/browse/BEAM-10393 Project: Beam Issue Type: New Feature Components: dsl-sql-zetasql Reporter: Jacob Ferriero -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10061) ReadAllFromTextWithFilename
[ https://issues.apache.org/jira/browse/BEAM-10061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero updated BEAM-10061: -- Component/s: (was: io-py-gcp) sdk-java-core io-py-files io-java-files > ReadAllFromTextWithFilename > --- > > Key: BEAM-10061 > URL: https://issues.apache.org/jira/browse/BEAM-10061 > Project: Beam > Issue Type: New Feature > Components: io-java-files, io-py-files, sdk-java-core, sdk-py-core > Environment: Dataflow with Python >Reporter: Ryan Canty >Priority: P2 > > I am trying to create a job that reads from GCS executes some code against > each line and creates a PCollection with the line and the file. So basically > what I'd like is a combination of textio.ReadTextWithFilename and > textio.ReadAllFromText -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-10061) ReadAllFromTextWithFilename
[ https://issues.apache.org/jira/browse/BEAM-10061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126295#comment-17126295 ] Jacob Ferriero commented on BEAM-10061: --- This would also be useful in java sdk as that's what I typically see customers using. (added java components to this issue) > ReadAllFromTextWithFilename > --- > > Key: BEAM-10061 > URL: https://issues.apache.org/jira/browse/BEAM-10061 > Project: Beam > Issue Type: New Feature > Components: io-java-files, io-py-files, sdk-java-core, sdk-py-core > Environment: Dataflow with Python >Reporter: Ryan Canty >Priority: P2 > > I am trying to create a job that reads from GCS executes some code against > each line and creates a PCollection with the line and the file. So basically > what I'd like is a combination of textio.ReadTextWithFilename and > textio.ReadAllFromText -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-10175) FhirIO execute bundle uses deprecated auth uri param
Jacob Ferriero created BEAM-10175: - Summary: FhirIO execute bundle uses deprecated auth uri param Key: BEAM-10175 URL: https://issues.apache.org/jira/browse/BEAM-10175 Project: Beam Issue Type: Improvement Components: io-java-gcp Affects Versions: 2.22.0 Reporter: Jacob Ferriero Assignee: Jacob Ferriero -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-10141) HL7v2IO read methods should assign sendTime timestamps
[ https://issues.apache.org/jira/browse/BEAM-10141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero reassigned BEAM-10141: - Assignee: Jacob Ferriero > HL7v2IO read methods should assign sendTime timestamps > -- > > Key: BEAM-10141 > URL: https://issues.apache.org/jira/browse/BEAM-10141 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: P3 > > Per Suggestion in this > [conversation|https://github.com/apache/beam/pull/11596#discussion_r427633240] > Add timestamped values and watermark estimate to > HL7v2IO.ListMessages. The same argument can be made for making HL7v2IO.Read > return timestamped values. > This should be optional and default to false to not be interface breaking. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-10141) HL7v2IO read methods should assign sendTime timestamps
Jacob Ferriero created BEAM-10141: - Summary: HL7v2IO read methods should assign sendTime timestamps Key: BEAM-10141 URL: https://issues.apache.org/jira/browse/BEAM-10141 Project: Beam Issue Type: Improvement Components: io-java-gcp Reporter: Jacob Ferriero Per Suggestion in this [conversation|https://github.com/apache/beam/pull/11596#discussion_r427633240] Add timestamped values and watermark estimate to HL7v2IO.ListMessages. The same argument can be made for making HL7v2IO.Read return timestamped values. This should be optional and default to false to not be interface breaking. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
[ https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero resolved BEAM-9856. -- Fix Version/s: 2.22.0 Resolution: Fixed > HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization > -- > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: P3 > Fix For: 2.22.0 > > Time Spent: 10.5h > Remaining Estimate: 0h > > Currently the List Messages API paginates through in a single ProcessElement > Call. > However we could get a restriction based on createTime using Messages.List > filter and orderby. > > This is inline with the future roadmap of HL7v2 bulk export API becomes > available that should allow splitting on (e.g. create time dimension). > Leveraging this bulk export might be a future optimization to explore. > > This could take one of two forms: > 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make > optimization the runner's problem, potentially unnecessarily complex for this > use case ) > 2. static splitting on some time partition e.g. finding the earliest > createTime and emitting a PCollection of 1 hour partitions and paginating > through each hour of data w/ in the time frame that the store spans, in a > separate ProcessElement. (easy to implement but will likely have hot keys / > stragglers based on "busy hours") > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-9831) HL7v2IO Improvements
[ https://issues.apache.org/jira/browse/BEAM-9831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero resolved BEAM-9831. -- Fix Version/s: 2.22.0 Resolution: Fixed > HL7v2IO Improvements > > > Key: BEAM-9831 > URL: https://issues.apache.org/jira/browse/BEAM-9831 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: P2 > Fix For: 2.22.0 > > Time Spent: 2h 50m > Remaining Estimate: 0h > > # HL7v2MessageCoder constructor should be public for use by end users > # Currently HL7v2IO.ListHL7v2Messages blocks on pagination through list > messages results before emitting any output data elements (due to high fan > out from a single input element). We should add early firings so that > downstream processing can proceed on early pages while later pages are still > being scrolled through. > # We should drop all output only fields of HL7v2Message and only keep data > and labels when calling ingestMessages, rather than expecting the user to do > this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-9468) Add Google Cloud Healthcare API IO Connectors
[ https://issues.apache.org/jira/browse/BEAM-9468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero resolved BEAM-9468. -- Resolution: Fixed > Add Google Cloud Healthcare API IO Connectors > - > > Key: BEAM-9468 > URL: https://issues.apache.org/jira/browse/BEAM-9468 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: P3 > Fix For: 2.22.0 > > Time Spent: 53.5h > Remaining Estimate: 0h > > Add IO Transforms for the HL7v2, FHIR and DICOM stores in the [Google Cloud > Healthcare API|https://cloud.google.com/healthcare/docs/] > HL7v2IO > FHIRIO > DICOM -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-9468) Add Google Cloud Healthcare API IO Connectors
[ https://issues.apache.org/jira/browse/BEAM-9468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero updated BEAM-9468: - Fix Version/s: 2.22.0 > Add Google Cloud Healthcare API IO Connectors > - > > Key: BEAM-9468 > URL: https://issues.apache.org/jira/browse/BEAM-9468 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: P3 > Fix For: 2.22.0 > > Time Spent: 53.5h > Remaining Estimate: 0h > > Add IO Transforms for the HL7v2, FHIR and DICOM stores in the [Google Cloud > Healthcare API|https://cloud.google.com/healthcare/docs/] > HL7v2IO > FHIRIO > DICOM -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9468) Add Google Cloud Healthcare API IO Connectors
[ https://issues.apache.org/jira/browse/BEAM-9468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17113411#comment-17113411 ] Jacob Ferriero commented on BEAM-9468: -- Yes! > Add Google Cloud Healthcare API IO Connectors > - > > Key: BEAM-9468 > URL: https://issues.apache.org/jira/browse/BEAM-9468 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: P3 > Time Spent: 53.5h > Remaining Estimate: 0h > > Add IO Transforms for the HL7v2, FHIR and DICOM stores in the [Google Cloud > Healthcare API|https://cloud.google.com/healthcare/docs/] > HL7v2IO > FHIRIO > DICOM -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10040) TestPubsubSignal not signalling success w/ Dataflow Runner
[ https://issues.apache.org/jira/browse/BEAM-10040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero updated BEAM-10040: -- Issue Type: Bug (was: Improvement) > TestPubsubSignal not signalling success w/ Dataflow Runner > -- > > Key: BEAM-10040 > URL: https://issues.apache.org/jira/browse/BEAM-10040 > Project: Beam > Issue Type: Bug > Components: test-failures >Reporter: Jacob Ferriero >Priority: P2 > > The issue with FhirIOReadIT seems to be some misuse of TestPubsubSignal > [Example > Job|https://pantheon.corp.google.com/dataflow/jobs/us-central1/2020-05-19_19_05_19-1057064265214622054?project=apache-beam-testing] > clearly has the expected >2000 elements added to the "waitForAnyMessage" task > but the success signal never gets published to the results topic. > Notably there are job level warnings about metric descriptors and [warnings > in shuffle > logs|https://pantheon.corp.google.com/logs/viewer?dateRangeEnd=2020-05-20T02:18:25.642Z&dateRangeStart=2020-05-20T02:05:21.347Z&expandAll=false&interval=CUSTOM&project=apache-beam-testing&resource=dataflow_step%2Fjob_id%2F2020-05-19_19_05_19-1057064265214622054×tamp=2020-05-20T02:57:46.18700Z&logName=projects%2Fapache-beam-testing%2Flogs%2Fdataflow.googleapis.com%252Fshuffler&minLogLevel=0&customFacets=&limitCustomFacetWidth=true&scrollTimestamp=2020-05-20T02:15:27.999165390Z] > which warns: > "Update range task returned 'invalid argument'. Assuming lost lease for work > with id 5061980071068333770 (expiration time: 1589940982000, now: > 1589940923590, full status: INVALID_ARGUMENT: Http(400) Bad Request). For > more information, see > https://cloud.google.com/dataflow/docs/guides/common-errors."; -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-10040) TestPubsubSignal not signalling success w/ Dataflow Runner
Jacob Ferriero created BEAM-10040: - Summary: TestPubsubSignal not signalling success w/ Dataflow Runner Key: BEAM-10040 URL: https://issues.apache.org/jira/browse/BEAM-10040 Project: Beam Issue Type: Improvement Components: test-failures Reporter: Jacob Ferriero The issue with FhirIOReadIT seems to be some misuse of TestPubsubSignal [Example Job|https://pantheon.corp.google.com/dataflow/jobs/us-central1/2020-05-19_19_05_19-1057064265214622054?project=apache-beam-testing] clearly has the expected >2000 elements added to the "waitForAnyMessage" task but the success signal never gets published to the results topic. Notably there are job level warnings about metric descriptors and [warnings in shuffle logs|https://pantheon.corp.google.com/logs/viewer?dateRangeEnd=2020-05-20T02:18:25.642Z&dateRangeStart=2020-05-20T02:05:21.347Z&expandAll=false&interval=CUSTOM&project=apache-beam-testing&resource=dataflow_step%2Fjob_id%2F2020-05-19_19_05_19-1057064265214622054×tamp=2020-05-20T02:57:46.18700Z&logName=projects%2Fapache-beam-testing%2Flogs%2Fdataflow.googleapis.com%252Fshuffler&minLogLevel=0&customFacets=&limitCustomFacetWidth=true&scrollTimestamp=2020-05-20T02:15:27.999165390Z] which warns: "Update range task returned 'invalid argument'. Assuming lost lease for work with id 5061980071068333770 (expiration time: 1589940982000, now: 1589940923590, full status: INVALID_ARGUMENT: Http(400) Bad Request). For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors."; -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-9990) FhirIO should support conditional create / update methods
[ https://issues.apache.org/jira/browse/BEAM-9990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero updated BEAM-9990: - Description: There are many use cases where it is expected that calling executeBundles in a distributed environment may fail (e.g. trying to create a resource that already exists). We should add classes to support the following methods as implementations of FhirIO.Write to provide more robust reconciliation strategies for Dead Letter Queues involving FhirIO.Write https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/conditionalUpdate https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/update > FhirIO should support conditional create / update methods > - > > Key: BEAM-9990 > URL: https://issues.apache.org/jira/browse/BEAM-9990 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: Major > > There are many use cases where it is expected that calling executeBundles in > a distributed environment may fail (e.g. trying to create a resource that > already exists). > We should add classes to support the following methods as implementations of > FhirIO.Write to provide more robust reconciliation strategies for Dead Letter > Queues involving FhirIO.Write > https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/conditionalUpdate > https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create > https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/update -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9990) FhirIO should support conditional create / update methods
Jacob Ferriero created BEAM-9990: Summary: FhirIO should support conditional create / update methods Key: BEAM-9990 URL: https://issues.apache.org/jira/browse/BEAM-9990 Project: Beam Issue Type: Improvement Components: io-java-gcp Reporter: Jacob Ferriero Assignee: Jacob Ferriero -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9779) HL7v2IOWriteIT is flaky
[ https://issues.apache.org/jira/browse/BEAM-9779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099525#comment-17099525 ] Jacob Ferriero commented on BEAM-9779: -- [~chamikara] Has the changes in https://github.com/apache/beam/pull/11450 stabilized this test? should we close this? > HL7v2IOWriteIT is flaky > --- > > Key: BEAM-9779 > URL: https://issues.apache.org/jira/browse/BEAM-9779 > Project: Beam > Issue Type: Bug > Components: io-java-gcp, test-failures >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: Critical > Time Spent: 4h 10m > Remaining Estimate: 0h > > There seems to be a race condition somewhere in HL7v2IOWriteIT that causes > flakiness. > https://builds.apache.org/job/beam_PostCommit_Java/5947/ > https://builds.apache.org/job/beam_PostCommit_Java/5943/ > https://builds.apache.org/job/beam_PostCommit_Java/5942/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (BEAM-9847) Verify If Triggering allows emitting eager results when processing a single element in HL7v2IO.
[ https://issues.apache.org/jira/browse/BEAM-9847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero resolved BEAM-9847. -- Fix Version/s: Not applicable Resolution: Not A Bug > Verify If Triggering allows emitting eager results when processing a single > element in HL7v2IO. > --- > > Key: BEAM-9847 > URL: https://issues.apache.org/jira/browse/BEAM-9847 > Project: Beam > Issue Type: Task > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: Minor > Fix For: Not applicable > > > Due to the nature of the HL7v2 API, HL7v2IO.ListHL7v2Messages follows the > pattern of [paginating through all ListMessages results in a single > ProcessElement call.|#diff-9cd595f078378218ccc01ce7e19ca766R447]] > > Upon testing with customer against HL7v2 store with 350k messages we > observed that the ListMessages transform was not outputting any elements > "hanging" for a long time (which was assumed to be the single thread > paginating through all the results). > > We added the following triggering in hopes that it would emit early results: > {code:java} > .apply( > Window.into(new GlobalWindows()) > .triggering( > AfterWatermark.pastEndOfWindow() > .withEarlyFirings( > AfterProcessingTime.pastFirstElementInPane() > .plusDelayOf(Duration.standardSeconds(1 > .discardingFiredPanes()) > {code} > Our tests with this triggering seemed to indicate that it did not "hang" like > the first test and seemed to output a more steady stream of elements. > Reviewer states that bundles must be committed atomically so no output > elements of a (single process element call) can proceed to downstream stages > until all output elements for that process element call are ready. > There may be other things at play here. Will seek to reproduce in a way that > definitively confirms output elements can be eagerly output during the > execution of a single process element call before it completes. > CC: [~pabloem] > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
[ https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17097785#comment-17097785 ] Jacob Ferriero commented on BEAM-9856: -- I have begun work on a POC for what this would look like as splittable DoFn in #11596. > HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization > -- > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: Minor > Time Spent: 10m > Remaining Estimate: 0h > > Currently the List Messages API paginates through in a single ProcessElement > Call. > However we could get a restriction based on createTime using Messages.List > filter and orderby. > > This is inline with the future roadmap of HL7v2 bulk export API becomes > available that should allow splitting on (e.g. create time dimension). > Leveraging this bulk export might be a future optimization to explore. > > This could take one of two forms: > 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make > optimization the runner's problem, potentially unnecessarily complex for this > use case ) > 2. static splitting on some time partition e.g. finding the earliest > createTime and emitting a PCollection of 1 hour partitions and paginating > through each hour of data w/ in the time frame that the store spans, in a > separate ProcessElement. (easy to implement but will likely have hot keys / > stragglers based on "busy hours") > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
[ https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17097008#comment-17097008 ] Jacob Ferriero edited comment on BEAM-9856 at 4/30/20, 9:59 PM: Additional Consideration: The current ListMessages implementation allows a user to specify a filter. If we are leveraging filters in our sharding approach we should use AND to respect the user's filter as well. This could potentially lead to to wasting queries due to disjoint filtering on sendTime. was (Author: data-runner0): Additional Consideration: The current ListMessages implementation allows a user to specify a filter. If we are leveraging filters in our sharding approach we should use AND to respect the user's filter as well. This could potentially lead to to wasting queries due to disjoint filtering on createTime. > HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization > -- > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: Minor > > Currently the List Messages API paginates through in a single ProcessElement > Call. > However we could get a restriction based on createTime using Messages.List > filter and orderby. > > This is inline with the future roadmap of HL7v2 bulk export API becomes > available that should allow splitting on (e.g. create time dimension). > Leveraging this bulk export might be a future optimization to explore. > > This could take one of two forms: > 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make > optimization the runner's problem, potentially unnecessarily complex for this > use case ) > 2. static splitting on some time partition e.g. finding the earliest > createTime and emitting a PCollection of 1 hour partitions and paginating > through each hour of data w/ in the time frame that the store spans, in a > separate ProcessElement. (easy to implement but will likely have hot keys / > stragglers based on "busy hours") > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
[ https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17096961#comment-17096961 ] Jacob Ferriero edited comment on BEAM-9856 at 4/30/20, 9:59 PM: The existing GA [HL7v2 Messages.List API|https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages/list] allows us to specify a filter and order by sendTime. We should be able to use this as our restriction dimension to make this a splitable DoFn. I will investigate feasibility of this. Basic Design Proposal: Each Messages.List query will have a sendTime filter based on it's restriction and orderBy sendTime in order closely mimic the [OffsetRangeTracker|https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.html] pattern. * getInitialRestriction: run a query against HL7v2 store to get earliest sendTime * RestrictionTracker: keep a "watermark" based on sendTime * splitRestriction: split timestamp range based on fraction (this will require an existing HL7v2 Message List call can be notified to "stop paginating through the at an arbitrary sendTime". Note that each List query might not get to the end of it's sendTime filter (due to splitting). Instead it should just spot processing results when it encounters are record past the end of it's restriction. This should allow us to more eagerly emit results as certain restrictions are completed. Open Questions: * Is there a canonical way of specifying more than one initial restriction based on assumption that you know for most use cases you'll want to split at least n-times upfront (e.g. partition by day/hour to start and dynamically split from there) ? Is this an anti-pattern because * Should the initial restriction have a endTime? Should this be an optional user parameter for the transform? What should the default be (e.g. Instant.now() just before firing the first query)? or Should the ListMessages just scroll until it is "caught up" there are no newer messages? Jake's two cents: I consider this List Messages transform to primarily serve a batch / bounded backfill or replay use case. I believe real-time use cases should use the Pub Sub notifications for event driven / streaming updates from the HL7v2 store with HL7v2IO.readAll() (as this allows for much greater parallelization and is more likely to "keep up" during higher throughput). However, there is nothing stopping a user from using ListMessages against a store that is still being updated. If this becomes a splittable DoFn that fires many ListMessages throughtout it's life time and our initial restriction is [minSendTime, inf), this could become an unbounded source (or more specifically unbounded per element). I feel we should force ListMessages to be bounded per element by always having a endSendTime. was (Author: data-runner0): The existing GA [HL7v2 Messages.List API|https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages/list] allows us to specify a filter and order by createTime. We should be able to use this as our restriction dimension to make this a splitable DoFn. I will investigate feasibility of this. Basic Design Proposal: Each Messages.List query will have a createTime filter based on it's restriction and orderBy createTime in order closely mimic the [OffsetRangeTracker|https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.html] pattern. * getInitialRestriction: run a query against HL7v2 store to get earliest createTime * RestrictionTracker: keep a "watermark" based on createTime * splitRestriction: split timestamp range based on fraction (this will require an existing HL7v2 Message List call can be notified to "stop paginating through the at an arbitrary createTime". Note that each List query might not get to the end of it's createTime filter (due to splitting). Instead it should just spot processing results when it encounters are record past the end of it's restriction. This should allow us to more eagerly emit results as certain restrictions are completed. Open Questions: * Is there a canonical way of specifying more than one initial restriction based on assumption that you know for most use cases you'll want to split at least n-times upfront (e.g. partition by day/hour to start and dynamically split from there) ? Is this an anti-pattern because * Should the initial restriction have a endTime? Should this be an optional user parameter for the transform? What should the default be (e.g. Instant.now() just before firing the first query)? or Should the ListMessages just scroll until it is "caught up" there are no newer messages? Jake's two cents: I consider this List Messages transform to primarily serv
[jira] [Commented] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
[ https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17097008#comment-17097008 ] Jacob Ferriero commented on BEAM-9856: -- Additional Consideration: The current ListMessages implementation allows a user to specify a filter. If we are leveraging filters in our sharding approach we should use AND to respect the user's filter as well. This could potentially lead to to wasting queries due to disjoint filtering on createTime. > HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization > -- > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: Minor > > Currently the List Messages API paginates through in a single ProcessElement > Call. > However we could get a restriction based on createTime using Messages.List > filter and orderby. > > This is inline with the future roadmap of HL7v2 bulk export API becomes > available that should allow splitting on (e.g. create time dimension). > Leveraging this bulk export might be a future optimization to explore. > > This could take one of two forms: > 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make > optimization the runner's problem, potentially unnecessarily complex for this > use case ) > 2. static splitting on some time partition e.g. finding the earliest > createTime and emitting a PCollection of 1 hour partitions and paginating > through each hour of data w/ in the time frame that the store spans, in a > separate ProcessElement. (easy to implement but will likely have hot keys / > stragglers based on "busy hours") > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
[ https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero updated BEAM-9856: - Description: Currently the List Messages API paginates through in a single ProcessElement Call. However we could get a restriction based on createTime using Messages.List filter and orderby. This is inline with the future roadmap of HL7v2 bulk export API becomes available that should allow splitting on (e.g. create time dimension). Leveraging this bulk export might be a future optimization to explore. This could take one of two forms: 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make optimization the runner's problem, potentially unnecessarily complex for this use case ) 2. static splitting on some time partition e.g. finding the earliest createTime and emitting a PCollection of 1 hour partitions and paginating through each hour of data w/ in the time frame that the store spans, in a separate ProcessElement. (easy to implement but will likely have hot keys / stragglers based on "busy hours") was: Currently the List Messages API paginates through in a single ProcessElement Call. However we could get a restriction based on createTime using Messages.List filter and orderby. This is inline with the future roadmap of HL7v2 bulk export API becomes available that should allow splitting on (e.g. create time dimension). Leveraging this bulk export might be a future optimization to explore. This could take one of two forms: 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make optimization the runner's problem, potentially unnecessarily complex for this use case ) 1. static splitting on some time partition e.g. finding the earliest createTime and emitting a PCollection of 1 hour partitions and paginating through each hour of data w/ in the time frame that the store spans, in a separate ProcessElement. (easy to implement but will likely have hot keys / stragglers based on "busy hours") > HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization > -- > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: Minor > > Currently the List Messages API paginates through in a single ProcessElement > Call. > However we could get a restriction based on createTime using Messages.List > filter and orderby. > > This is inline with the future roadmap of HL7v2 bulk export API becomes > available that should allow splitting on (e.g. create time dimension). > Leveraging this bulk export might be a future optimization to explore. > > This could take one of two forms: > 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make > optimization the runner's problem, potentially unnecessarily complex for this > use case ) > 2. static splitting on some time partition e.g. finding the earliest > createTime and emitting a PCollection of 1 hour partitions and paginating > through each hour of data w/ in the time frame that the store spans, in a > separate ProcessElement. (easy to implement but will likely have hot keys / > stragglers based on "busy hours") > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
[ https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero updated BEAM-9856: - Description: Currently the List Messages API paginates through in a single ProcessElement Call. However we could get a restriction based on createTime using Messages.List filter and orderby. This is inline with the future roadmap of HL7v2 bulk export API becomes available that should allow splitting on (e.g. create time dimension). Leveraging this bulk export might be a future optimization to explore. This could take one of two forms: 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make optimization the runner's problem, potentially unnecessarily complex for this use case ) 1. static splitting on some time partition e.g. finding the earliest createTime and emitting a PCollection of 1 hour partitions and paginating through each hour of data w/ in the time frame that the store spans, in a separate ProcessElement. (easy to implement but will likely have hot keys / stragglers based on "busy hours") was: Currently the List Messages API paginates through in a single ProcessElement Call. However we could get a restriction based on createTime using Messages.List filter and orderby. This is inline with the future roadmap of HL7v2 bulk export API becomes available that should allow splitting on (e.g. create time dimension). Leveraging this bulk export might be a future optimization to explore. This could look like paginating through each hour of data w/ in the time frame that the store spans, in a separate thread. > HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization > -- > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: Minor > > Currently the List Messages API paginates through in a single ProcessElement > Call. > However we could get a restriction based on createTime using Messages.List > filter and orderby. > > This is inline with the future roadmap of HL7v2 bulk export API becomes > available that should allow splitting on (e.g. create time dimension). > Leveraging this bulk export might be a future optimization to explore. > > This could take one of two forms: > 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make > optimization the runner's problem, potentially unnecessarily complex for this > use case ) > 1. static splitting on some time partition e.g. finding the earliest > createTime and emitting a PCollection of 1 hour partitions and paginating > through each hour of data w/ in the time frame that the store spans, in a > separate ProcessElement. (easy to implement but will likely have hot keys / > stragglers based on "busy hours") > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored as splitable DoFn
[ https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero updated BEAM-9856: - Description: Currently the List Messages API paginates through in a single ProcessElement Call. However we could get a restriction based on createTime using Messages.List filter and orderby. This is inline with the future roadmap of HL7v2 bulk export API becomes available that should allow splitting on (e.g. create time dimension). Leveraging this bulk export might be a future optimization to explore. This could look like paginating through each hour of data w/ in the time frame that the store spans, in a separate thread. was: Currently the List Messages API paginates through in a single ProcessElement Call. In the future if a bulk export API becomes available that would allow splitting on some dimension (e.g. create time), this should be refactored as a splittable DoFn or at least run several sub queries so that we are not listing an entire store in a single thread. This could look like paginating through each hour of data w/ in the time frame that the store spans, in a separate thread. > HL7v2IO.ListHL7v2Messages should be refactored as splitable DoFn > > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: Minor > > Currently the List Messages API paginates through in a single ProcessElement > Call. > However we could get a restriction based on createTime using Messages.List > filter and orderby. > > This is inline with the future roadmap of HL7v2 bulk export API becomes > available that should allow splitting on (e.g. create time dimension). > Leveraging this bulk export might be a future optimization to explore. > > This could look like paginating through each hour of data w/ in the time > frame that the store spans, in a separate thread. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
[ https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero updated BEAM-9856: - Summary: HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization (was: HL7v2IO.ListHL7v2Messages should be refactored as splitable DoFn) > HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization > -- > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: Minor > > Currently the List Messages API paginates through in a single ProcessElement > Call. > However we could get a restriction based on createTime using Messages.List > filter and orderby. > > This is inline with the future roadmap of HL7v2 bulk export API becomes > available that should allow splitting on (e.g. create time dimension). > Leveraging this bulk export might be a future optimization to explore. > > This could look like paginating through each hour of data w/ in the time > frame that the store spans, in a separate thread. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored as splitable DoFn
[ https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero updated BEAM-9856: - Priority: Minor (was: Major) > HL7v2IO.ListHL7v2Messages should be refactored as splitable DoFn > > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: Minor > > Currently the List Messages API paginates through in a single ProcessElement > Call. > > In the future if a bulk export API becomes available that would allow > splitting on some dimension (e.g. create time), this should be refactored as > a splittable DoFn or at least run several sub queries so that we are not > listing an entire store in a single thread. > > This could look like paginating through each hour of data w/ in the time > frame that the store spans, in a separate thread. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored as splitable DoFn
[ https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero updated BEAM-9856: - Summary: HL7v2IO.ListHL7v2Messages should be refactored as splitable DoFn (was: HL7v2IO.ListHL7v2Messages should be refactored once Bulk Export API is available) > HL7v2IO.ListHL7v2Messages should be refactored as splitable DoFn > > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: Major > > Currently the List Messages API paginates through in a single ProcessElement > Call. > > In the future if a bulk export API becomes available that would allow > splitting on some dimension (e.g. create time), this should be refactored as > a splittable DoFn or at least run several sub queries so that we are not > listing an entire store in a single thread. > > This could look like paginating through each hour of data w/ in the time > frame that the store spans, in a separate thread. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored once Bulk Export API is available
[ https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero updated BEAM-9856: - Summary: HL7v2IO.ListHL7v2Messages should be refactored once Bulk Export API is available (was: HL7v2IO.ListMessages should be refactored once Bulk Export API is available) > HL7v2IO.ListHL7v2Messages should be refactored once Bulk Export API is > available > > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: Major > > Currently the List Messages API paginates through in a single ProcessElement > Call. > > In the future if a bulk export API becomes available that would allow > splitting on some dimension (e.g. create time), this should be refactored as > a splittable DoFn or at least run several sub queries so that we are not > listing an entire store in a single thread. > > This could look like paginating through each hour of data w/ in the time > frame that the store spans, in a separate thread. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9856) HL7v2IO.ListMessages should be refactored once Bulk Export API is available
[ https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17096961#comment-17096961 ] Jacob Ferriero commented on BEAM-9856: -- The existing GA [HL7v2 Messages.List API|https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages/list] allows us to specify a filter and order by createTime. We should be able to use this as our restriction dimension to make this a splitable DoFn. I will investigate feasibility of this. Basic Design Proposal: Each Messages.List query will have a createTime filter based on it's restriction and orderBy createTime in order closely mimic the [OffsetRangeTracker|https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.html] pattern. * getInitialRestriction: run a query against HL7v2 store to get earliest createTime * RestrictionTracker: keep a "watermark" based on createTime * splitRestriction: split timestamp range based on fraction (this will require an existing HL7v2 Message List call can be notified to "stop paginating through the at an arbitrary createTime". Note that each List query might not get to the end of it's createTime filter (due to splitting). Instead it should just spot processing results when it encounters are record past the end of it's restriction. This should allow us to more eagerly emit results as certain restrictions are completed. Open Questions: * Is there a canonical way of specifying more than one initial restriction based on assumption that you know for most use cases you'll want to split at least n-times upfront (e.g. partition by day/hour to start and dynamically split from there) ? Is this an anti-pattern because * Should the initial restriction have a endTime? Should this be an optional user parameter for the transform? What should the default be (e.g. Instant.now() just before firing the first query)? or Should the ListMessages just scroll until it is "caught up" there are no newer messages? Jake's two cents: I consider this List Messages transform to primarily serve a batch / bounded backfill or replay use case. I believe real-time use cases should use the Pub Sub notifications for event driven / streaming updates from the HL7v2 store with HL7v2IO.readAll() (as this allows for much greater parallelization and is more likely to "keep up" during higher throughput). However, there is nothing stopping a user from using ListMessages against a store that is still being updated. If this becomes a splittable DoFn that fires many ListMessages throughtout it's life time and our initial restriction is [minCreateTime, inf), this could become an unbounded source (or more specifically unbounded per element). I feel we should force ListMessages to be bounded per element by always having a endCreateTime. > HL7v2IO.ListMessages should be refactored once Bulk Export API is available > --- > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: Major > > Currently the List Messages API paginates through in a single ProcessElement > Call. > > In the future if a bulk export API becomes available that would allow > splitting on some dimension (e.g. create time), this should be refactored as > a splittable DoFn or at least run several sub queries so that we are not > listing an entire store in a single thread. > > This could look like paginating through each hour of data w/ in the time > frame that the store spans, in a separate thread. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-9856) HL7v2IO.ListMessages should be refactored once Bulk Export API is available
[ https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero reassigned BEAM-9856: Assignee: Jacob Ferriero > HL7v2IO.ListMessages should be refactored once Bulk Export API is available > --- > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: Major > > Currently the List Messages API paginates through in a single ProcessElement > Call. > > In the future if a bulk export API becomes available that would allow > splitting on some dimension (e.g. create time), this should be refactored as > a splittable DoFn or at least run several sub queries so that we are not > listing an entire store in a single thread. > > This could look like paginating through each hour of data w/ in the time > frame that the store spans, in a separate thread. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-9856) HL7v2IO.ListMessages should be refactored once Bulk Export API is available
[ https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero updated BEAM-9856: - Description: Currently the List Messages API paginates through in a single ProcessElement Call. In the future if a bulk export API becomes available that would allow splitting on some dimension (e.g. create time), this should be refactored as a splittable DoFn or at least run several sub queries so that we are not listing an entire store in a single thread. This could look like paginating through each hour of data w/ in the time frame that the store spans, in a separate thread. was: Currently the List Messages API paginates through in a single ProcessElement Call. In the future if a bulk export API becomes available that would allow splitting on some dimension (e.g. create time), this should be refactored as a splittable DoFn or at least run several sub queries so that we are not listing an entire store in a single thread. > HL7v2IO.ListMessages should be refactored once Bulk Export API is available > --- > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Priority: Major > > Currently the List Messages API paginates through in a single ProcessElement > Call. > > In the future if a bulk export API becomes available that would allow > splitting on some dimension (e.g. create time), this should be refactored as > a splittable DoFn or at least run several sub queries so that we are not > listing an entire store in a single thread. > > This could look like paginating through each hour of data w/ in the time > frame that the store spans, in a separate thread. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-9856) HL7v2IO.ListMessages should be refactored once Bulk Export API is available
[ https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero updated BEAM-9856: - Description: Currently the List Messages API paginates through in a single ProcessElement Call. In the future if a bulk export API becomes available that would allow splitting on some dimension (e.g. create time), this should be refactored as a splittable DoFn or at least run several sub queries so that we are not listing an entire store in a single thread. was: Currently the List Messages API paginates through in a single ProcessElement Call. In the future if a bulk export API becomes available that would allow splitting on some dimension (e.g. create time), this should be refactored as a splittable DoFn. > HL7v2IO.ListMessages should be refactored once Bulk Export API is available > --- > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Priority: Major > > Currently the List Messages API paginates through in a single ProcessElement > Call. > > In the future if a bulk export API becomes available that would allow > splitting on some dimension (e.g. create time), this should be refactored as > a splittable DoFn or at least run several sub queries so that we are not > listing an entire store in a single thread. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9856) HL7v2IO.ListMessages should be refactored once Bulk Export API is available
Jacob Ferriero created BEAM-9856: Summary: HL7v2IO.ListMessages should be refactored once Bulk Export API is available Key: BEAM-9856 URL: https://issues.apache.org/jira/browse/BEAM-9856 Project: Beam Issue Type: Improvement Components: io-java-gcp Reporter: Jacob Ferriero Currently the List Messages API paginates through in a single ProcessElement Call. In the future if a bulk export API becomes available that would allow splitting on some dimension (e.g. create time), this should be refactored as a splittable DoFn. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9847) Verify If Triggering allows emitting eager results when processing a single element in HL7v2IO.
[ https://issues.apache.org/jira/browse/BEAM-9847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094947#comment-17094947 ] Jacob Ferriero commented on BEAM-9847: -- This test seems to confirm it is not possible to emit outputs of processElement call before it has completed. [https://gist.github.com/jaketf/d3c2e70dde781bbb0ef1993446e34b71] > Verify If Triggering allows emitting eager results when processing a single > element in HL7v2IO. > --- > > Key: BEAM-9847 > URL: https://issues.apache.org/jira/browse/BEAM-9847 > Project: Beam > Issue Type: Task > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: Minor > > Due to the nature of the HL7v2 API, HL7v2IO.ListHL7v2Messages follows the > pattern of [paginating through all ListMessages results in a single > ProcessElement call.|#diff-9cd595f078378218ccc01ce7e19ca766R447]] > > Upon testing with customer against HL7v2 store with 350k messages we > observed that the ListMessages transform was not outputting any elements > "hanging" for a long time (which was assumed to be the single thread > paginating through all the results). > > We added the following triggering in hopes that it would emit early results: > {code:java} > .apply( > Window.into(new GlobalWindows()) > .triggering( > AfterWatermark.pastEndOfWindow() > .withEarlyFirings( > AfterProcessingTime.pastFirstElementInPane() > .plusDelayOf(Duration.standardSeconds(1 > .discardingFiredPanes()) > {code} > Our tests with this triggering seemed to indicate that it did not "hang" like > the first test and seemed to output a more steady stream of elements. > Reviewer states that bundles must be committed atomically so no output > elements of a (single process element call) can proceed to downstream stages > until all output elements for that process element call are ready. > There may be other things at play here. Will seek to reproduce in a way that > definitively confirms output elements can be eagerly output during the > execution of a single process element call before it completes. > CC: [~pabloem] > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9847) Verify If Triggering allows emitting eager results when processing a single element in HL7v2IO.
Jacob Ferriero created BEAM-9847: Summary: Verify If Triggering allows emitting eager results when processing a single element in HL7v2IO. Key: BEAM-9847 URL: https://issues.apache.org/jira/browse/BEAM-9847 Project: Beam Issue Type: Task Components: io-java-gcp Reporter: Jacob Ferriero Assignee: Jacob Ferriero Due to the nature of the HL7v2 API, HL7v2IO.ListHL7v2Messages follows the pattern of [paginating through all ListMessages results in a single ProcessElement call.|#diff-9cd595f078378218ccc01ce7e19ca766R447]] Upon testing with customer against HL7v2 store with 350k messages we observed that the ListMessages transform was not outputting any elements "hanging" for a long time (which was assumed to be the single thread paginating through all the results). We added the following triggering in hopes that it would emit early results: {code:java} .apply( Window.into(new GlobalWindows()) .triggering( AfterWatermark.pastEndOfWindow() .withEarlyFirings( AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardSeconds(1 .discardingFiredPanes()) {code} Our tests with this triggering seemed to indicate that it did not "hang" like the first test and seemed to output a more steady stream of elements. Reviewer states that bundles must be committed atomically so no output elements of a (single process element call) can proceed to downstream stages until all output elements for that process element call are ready. There may be other things at play here. Will seek to reproduce in a way that definitively confirms output elements can be eagerly output during the execution of a single process element call before it completes. CC: [~pabloem] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-9831) HL7v2IO Improvements
[ https://issues.apache.org/jira/browse/BEAM-9831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero reassigned BEAM-9831: Assignee: Jacob Ferriero > HL7v2IO Improvements > > > Key: BEAM-9831 > URL: https://issues.apache.org/jira/browse/BEAM-9831 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: Major > > # HL7v2MessageCoder constructor should be public for use by end users > # Currently HL7v2IO.ListHL7v2Messages blocks on pagination through list > messages results before emitting any output data elements (due to high fan > out from a single input element). We should add early firings so that > downstream processing can proceed on early pages while later pages are still > being scrolled through. > # We should drop all output only fields of HL7v2Message and only keep data > and labels when calling ingestMessages, rather than expecting the user to do > this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9831) HL7v2IO Improvements
Jacob Ferriero created BEAM-9831: Summary: HL7v2IO Improvements Key: BEAM-9831 URL: https://issues.apache.org/jira/browse/BEAM-9831 Project: Beam Issue Type: Bug Components: io-java-gcp Reporter: Jacob Ferriero # HL7v2MessageCoder constructor should be public for use by end users # Currently HL7v2IO.ListHL7v2Messages blocks on pagination through list messages results before emitting any output data elements (due to high fan out from a single input element). We should add early firings so that downstream processing can proceed on early pages while later pages are still being scrolled through. # We should drop all output only fields of HL7v2Message and only keep data and labels when calling ingestMessages, rather than expecting the user to do this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9779) HL7v2IOWriteIT is flaky
[ https://issues.apache.org/jira/browse/BEAM-9779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086063#comment-17086063 ] Jacob Ferriero commented on BEAM-9779: -- Working theory wasn't able to reproduce locally because I have higher latency to the HL7v2 store in us-central1 than the jenkins VMs. > HL7v2IOWriteIT is flaky > --- > > Key: BEAM-9779 > URL: https://issues.apache.org/jira/browse/BEAM-9779 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: Minor > Time Spent: 20m > Remaining Estimate: 0h > > There seems to be a race condition somewhere in HL7v2IOWriteIT that causes > flakiness. > https://builds.apache.org/job/beam_PostCommit_Java/5947/ > https://builds.apache.org/job/beam_PostCommit_Java/5943/ > https://builds.apache.org/job/beam_PostCommit_Java/5942/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (BEAM-9779) HL7v2IOWriteIT is flaky
[ https://issues.apache.org/jira/browse/BEAM-9779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086057#comment-17086057 ] Jacob Ferriero edited comment on BEAM-9779 at 4/17/20, 8:44 PM: Seems that HL7v2 Store is indexed asynchronoulsy. Will add a sleep for 5s to stabilize this test. was (Author: data-runner0): Seems that HL7v2 Store is indexed asynchronoulsy. Will add a sleep for 10s to stabilize this test. > HL7v2IOWriteIT is flaky > --- > > Key: BEAM-9779 > URL: https://issues.apache.org/jira/browse/BEAM-9779 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: Minor > > There seems to be a race condition somewhere in HL7v2IOWriteIT that causes > flakiness. > https://builds.apache.org/job/beam_PostCommit_Java/5947/ > https://builds.apache.org/job/beam_PostCommit_Java/5943/ > https://builds.apache.org/job/beam_PostCommit_Java/5942/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9779) HL7v2IOWriteIT is flaky
[ https://issues.apache.org/jira/browse/BEAM-9779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086057#comment-17086057 ] Jacob Ferriero commented on BEAM-9779: -- Seems that HL7v2 Store is indexed asynchronoulsy. Will add a sleep for 10s to stabilize this test. > HL7v2IOWriteIT is flaky > --- > > Key: BEAM-9779 > URL: https://issues.apache.org/jira/browse/BEAM-9779 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: Minor > > There seems to be a race condition somewhere in HL7v2IOWriteIT that causes > flakiness. > https://builds.apache.org/job/beam_PostCommit_Java/5947/ > https://builds.apache.org/job/beam_PostCommit_Java/5943/ > https://builds.apache.org/job/beam_PostCommit_Java/5942/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9779) HL7v2IOWriteIT is flaky
[ https://issues.apache.org/jira/browse/BEAM-9779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086053#comment-17086053 ] Jacob Ferriero commented on BEAM-9779: -- I've been able to run this test many times locally and unable to reproduce. It blocks on a pipeline run that writes messages to HL7v2 store and then lists messages to verify count in = count out. The failure pattern seems to be that intermittently less messages than expected are written to the HL7v2 Store. > HL7v2IOWriteIT is flaky > --- > > Key: BEAM-9779 > URL: https://issues.apache.org/jira/browse/BEAM-9779 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: Minor > > There seems to be a race condition somewhere in HL7v2IOWriteIT that causes > flakiness. > https://builds.apache.org/job/beam_PostCommit_Java/5947/ > https://builds.apache.org/job/beam_PostCommit_Java/5943/ > https://builds.apache.org/job/beam_PostCommit_Java/5942/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-9779) HL7v2IOWriteIT is flaky
[ https://issues.apache.org/jira/browse/BEAM-9779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero reassigned BEAM-9779: Assignee: Jacob Ferriero > HL7v2IOWriteIT is flaky > --- > > Key: BEAM-9779 > URL: https://issues.apache.org/jira/browse/BEAM-9779 > Project: Beam > Issue Type: Bug > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: Minor > > There seems to be a race condition somewhere in HL7v2IOWriteIT that causes > flakiness. > https://builds.apache.org/job/beam_PostCommit_Java/5947/ > https://builds.apache.org/job/beam_PostCommit_Java/5943/ > https://builds.apache.org/job/beam_PostCommit_Java/5942/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9779) HL7v2IOWriteIT is flaky
Jacob Ferriero created BEAM-9779: Summary: HL7v2IOWriteIT is flaky Key: BEAM-9779 URL: https://issues.apache.org/jira/browse/BEAM-9779 Project: Beam Issue Type: Bug Components: io-java-gcp Reporter: Jacob Ferriero There seems to be a race condition somewhere in HL7v2IOWriteIT that causes flakiness. https://builds.apache.org/job/beam_PostCommit_Java/5947/ https://builds.apache.org/job/beam_PostCommit_Java/5943/ https://builds.apache.org/job/beam_PostCommit_Java/5942/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9468) Add Google Cloud Healthcare API IO Connectors
[ https://issues.apache.org/jira/browse/BEAM-9468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17057511#comment-17057511 ] Jacob Ferriero commented on BEAM-9468: -- https://github.com/apache/beam/pull/11107 > Add Google Cloud Healthcare API IO Connectors > - > > Key: BEAM-9468 > URL: https://issues.apache.org/jira/browse/BEAM-9468 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Jacob Ferriero >Priority: Minor > Time Spent: 20m > Remaining Estimate: 0h > > Add IO Transforms for the HL7v2, FHIR and DICOM stores in the [Google Cloud > Healthcare API|https://cloud.google.com/healthcare/docs/] > HL7v2IO > FHIRIO > DICOM -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-9468) Add Google Cloud Healthcare API IO Connectors
[ https://issues.apache.org/jira/browse/BEAM-9468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17056127#comment-17056127 ] Jacob Ferriero commented on BEAM-9468: -- I am starting a prototype on this work. PR to come in a week or so. > Add Google Cloud Healthcare API IO Connectors > - > > Key: BEAM-9468 > URL: https://issues.apache.org/jira/browse/BEAM-9468 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Jacob Ferriero >Priority: Minor > > Add IO Transforms for the HL7v2, FHIR and DICOM stores in the [Google Cloud > Healthcare API|https://cloud.google.com/healthcare/docs/] > HL7v2IO > FHIRIO > DICOM -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-9468) Add Google Cloud Healthcare API IO Connectors
Jacob Ferriero created BEAM-9468: Summary: Add Google Cloud Healthcare API IO Connectors Key: BEAM-9468 URL: https://issues.apache.org/jira/browse/BEAM-9468 Project: Beam Issue Type: New Feature Components: io-java-gcp Reporter: Jacob Ferriero Add IO Transforms for the HL7v2, FHIR and DICOM stores in the [Google Cloud Healthcare API|https://cloud.google.com/healthcare/docs/] HL7v2IO FHIRIO DICOM -- This message was sent by Atlassian Jira (v8.3.4#803005)