[jira] [Created] (BEAM-10544) Select Types not equal with nested schema

2020-07-21 Thread Jacob Ferriero (Jira)
Jacob Ferriero created BEAM-10544:
-

 Summary: Select Types not equal with nested schema
 Key: BEAM-10544
 URL: https://issues.apache.org/jira/browse/BEAM-10544
 Project: Beam
  Issue Type: Bug
  Components: dsl-sql, sdk-java-core
Reporter: Jacob Ferriero


When using SQL transform to join a large nested schema to a  flat table getting 
an error about "Types not equal" from Select [1]

We are not able the test of our use of SqlTransform to pass with direct runner. 
All code is checked into CSR [2].

Things of note:
Calcite Query Planner

Query (the real business logic was much more complex but this is sufficient to 
reproduce issue in our test)
```sql
SELECT
t1.DeviceName AS DeviceName,
t1.LinkName AS LinkName,
t1.HostName AS HostName,
t1.MeasuredAt AS MeasuredAt,
t2.b_dBm AS b_dBm
FROM
RealtimeRows AS t1
  INNER JOIN
  --BigQuery Dimension Side Input
TxPowerSideInput AS t2
  ON
t1.DeviceName = t2.DeviceName
```

Tables created like so (though in real tive )
```java
// This table has the same schema to the real incoming Pub/Sub messages
// in the real world use case.
PCollection realtimeTestData = pipeline
.apply("Read 1Hz staging",
BigQueryIO
.readTableRowsWithSchema()
.fromQuery(
"SELECT * FROM `taara-db.jake_views.staging_sample_float`")
.usingStandardSql())
.apply(Convert.toRows());

PCollection txPowerCalcRows = pipeline
.apply("Read Tx Power Calc Side Input",
BigQueryIO
.readTableRowsWithSchema()
.fromQuery(
"SELECT * FROM 
`taara-db`.MANUFACTURING.tx_power_timeinvariant_calculations")
.usingStandardSql())
.apply(Convert.toRows());
```

Relevant java snippet
```java 
  PCollection out = tables
.apply(
"Join to dimension Data",
SqlTransform
.query(sql)
.registerUdf("POW", Pow.class)
.registerUdf("SQRT", Sqrt.class)
.registerUdf("LOG10", Log10.class)
.registerUdf("GREATEST", Greatest.class)
.registerUdf("EXTRACT_OFFSET", ExtractArrayOffset.class)
.registerUdf("PARSE_TIMESTAMP", ParseTimestamp.class)
.registerUdf("UNIX_SECONDS", UnixSeconds.class)
);
```

[1] 
https://github.com/apache/beam/blob/b564239081e9351c56fb0e7d263495b95dd3f8f3/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Select.java#L203
[2] 
https://source.cloud.google.com/taara-db/pso-taara-realtime-margin/+/master:streaming-join/streaming-join/src/test/java/com/google/x/taara/dataflow/transforms/RxTxPowersCorrFERCombinedSqlTransformIT.java




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-10479) UDF / UDAF support for ZetaSQLQueryPlanner

2020-07-14 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-10479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero updated BEAM-10479:
--
Issue Type: New Feature  (was: Test)

> UDF / UDAF support for ZetaSQLQueryPlanner
> --
>
> Key: BEAM-10479
> URL: https://issues.apache.org/jira/browse/BEAM-10479
> Project: Beam
>  Issue Type: New Feature
>  Components: dsl-sql-zetasql
>Reporter: Jacob Ferriero
>Priority: P2
>
> [BeamSqlDslUdfUdafTest | 
> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java]
>  should be refactored so each test runs against all supported Query Planners 
> (namely Calcite and Zeta SQL).
> This could be achieved without code duplication by using Parameterized tests 
> and having each test run with both query planners (and easily support adding 
> more QueryPlanners in the future if necessary).
> {code:java}
> import org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner;
> import org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner;
> @RunWith(Parameterized.class)
> public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
>public QueryPlanner planner;
>   @Parameters(name = "{0}")
>   public static Collection> planners(){
> return Arrays.asList(
>   CalciteQueryPlanner.class,
>   ZetaSQLQueryPlanner.class)
>   }
>   BeamSqlDslUdfUdafTest(Class planner){
> this.planner = planner;
>   }
>   // TODO refactor each test that run SqlTransform::query to use 
> SqlTransform::withQueryPlannerClass(this.planner)
>   ...
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-10479) UDF / UDAF support for ZetaSQLQueryPlanner

2020-07-14 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-10479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero updated BEAM-10479:
--
Summary: UDF / UDAF support for ZetaSQLQueryPlanner  (was: UDF / UDAF 
support should be tested against ZetaSQLQueryPlanner)

> UDF / UDAF support for ZetaSQLQueryPlanner
> --
>
> Key: BEAM-10479
> URL: https://issues.apache.org/jira/browse/BEAM-10479
> Project: Beam
>  Issue Type: Test
>  Components: dsl-sql-zetasql
>Reporter: Jacob Ferriero
>Priority: P2
>
> [BeamSqlDslUdfUdafTest | 
> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java]
>  should be refactored so each test runs against all supported Query Planners 
> (namely Calcite and Zeta SQL).
> This could be achieved without code duplication by using Parameterized tests 
> and having each test run with both query planners (and easily support adding 
> more QueryPlanners in the future if necessary).
> {code:java}
> import org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner;
> import org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner;
> @RunWith(Parameterized.class)
> public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
>public QueryPlanner planner;
>   @Parameters(name = "{0}")
>   public static Collection> planners(){
> return Arrays.asList(
>   CalciteQueryPlanner.class,
>   ZetaSQLQueryPlanner.class)
>   }
>   BeamSqlDslUdfUdafTest(Class planner){
> this.planner = planner;
>   }
>   // TODO refactor each test that run SqlTransform::query to use 
> SqlTransform::withQueryPlannerClass(this.planner)
>   ...
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-10488) ZetaSQL OFFSET support for retrieving Array Values

2020-07-14 Thread Jacob Ferriero (Jira)
Jacob Ferriero created BEAM-10488:
-

 Summary: ZetaSQL OFFSET support for retrieving Array Values
 Key: BEAM-10488
 URL: https://issues.apache.org/jira/browse/BEAM-10488
 Project: Beam
  Issue Type: Test
  Components: dsl-sql-zetasql
Reporter: Jacob Ferriero


Without supporting OFFSET it Arrays are un-parseable for anything but passthing 
through a simple select with ZetaSQLPlanner.
https://github.com/google/zetasql/blob/master/docs/arrays.md#accessing-array-elements



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-10479) UDF / UDAF support should be tested against ZetaSQLQueryPlanner

2020-07-13 Thread Jacob Ferriero (Jira)
Jacob Ferriero created BEAM-10479:
-

 Summary: UDF / UDAF support should be tested against 
ZetaSQLQueryPlanner
 Key: BEAM-10479
 URL: https://issues.apache.org/jira/browse/BEAM-10479
 Project: Beam
  Issue Type: Test
  Components: dsl-sql-zetasql
Reporter: Jacob Ferriero


[BeamSqlDslUdfUdafTest | 
https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java]
 should be refactored so each test runs against all supported Query Planners 
(namely Calcite and Zeta SQL).

This could be achieved without code duplication by using Parameterized tests 
and having each test run with both query planners (and easily support adding 
more QueryPlanners in the future if necessary).



{code:java}
import org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner;
import org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner;

@RunWith(Parameterized.class)
public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {

   public QueryPlanner planner;

  @Parameters(name = "{0}")
  public static Collection> planners(){
return Arrays.asList(
  CalciteQueryPlanner.class,
  ZetaSQLQueryPlanner.class)
  }

  BeamSqlDslUdfUdafTest(Class planner){
this.planner = planner;
  }

  // TODO refactor each test that run SqlTransform::query to use 
SqlTransform::withQueryPlannerClass(this.planner)
  ...
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-10478) Beam Zeta SQL docs should call out additional dependency

2020-07-13 Thread Jacob Ferriero (Jira)
Jacob Ferriero created BEAM-10478:
-

 Summary: Beam Zeta SQL docs should call out additional dependency
 Key: BEAM-10478
 URL: https://issues.apache.org/jira/browse/BEAM-10478
 Project: Beam
  Issue Type: New Feature
  Components: dsl-sql-zetasql
Reporter: Jacob Ferriero


The documentation should explain that using beam zeta sql requires
both 
beam-sdks-java-extensions-sql
and
beam-sdks-java-extensions-sql-zetasql

 [This docs page | https://beam.apache.org/documentation/dsls/sql/overview/] is 
a bit misleading as you will get Class Not Found Exception if you try to use 
setPlannerName without this additional dependency installed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-10473) RowJson should support DATETIME

2020-07-13 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-10473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero updated BEAM-10473:
--
Description: 
Add support for DATETIME types to support adding an 
 interface to JsonToRow that accepts a joda DateTimeFormatter for parsing 
DateTime JSON string fields or specifies a behavior for treating numbers as 
unix timestamps.

This is crucial for SQL pipelines processing streaming data with timestamps.

The interface might look something lik this:
{code:java}
/** For parsing JSON string fields containing dates to DATETIME in the target 
schema */
JsonToRow::withDateTimeFormatter(org.joda.time.format.DateTimeFormatter)

/** For parsing JSON number fields containing milliseconds since UNIX epoch 
that are listed as DATETIME in the target schema*/
JsonToRow::usingUnixMillis()

/** For parsing JSON number fields containing seconds since UNIX epoch that are 
listed as DATETIME in the target schema*/
JsonToRow::usingUnixSeconds()
{code}

  was:
Add support for DATETIME types by adding an 
 interface to JsonToRow that accepts a joda DateTimeFormatter for parsing 
DateTime JSON string fields or specifies a behavior for treating numbers as 
unix timestamps.

The interface might look something lik this:
{code:java}
/** For parsing JSON string fields containing dates to DATETIME in the target 
schema */
JsonToRow::withDateTimeFormatter(org.joda.time.format.DateTimeFormatter)

/** For parsing JSON number fields containing milliseconds since UNIX epoch 
that are listed as DATETIME in the target schema*/
JsonToRow::usingUnixMillis()

/** For parsing JSON number fields containing seconds since UNIX epoch that are 
listed as DATETIME in the target schema*/
JsonToRow::usingUnixSeconds()
{code}


> RowJson should support DATETIME
> ---
>
> Key: BEAM-10473
> URL: https://issues.apache.org/jira/browse/BEAM-10473
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-core
>Reporter: Jacob Ferriero
>Priority: P2
>
> Add support for DATETIME types to support adding an 
>  interface to JsonToRow that accepts a joda DateTimeFormatter for parsing 
> DateTime JSON string fields or specifies a behavior for treating numbers as 
> unix timestamps.
> This is crucial for SQL pipelines processing streaming data with timestamps.
> The interface might look something lik this:
> {code:java}
> /** For parsing JSON string fields containing dates to DATETIME in the target 
> schema */
> JsonToRow::withDateTimeFormatter(org.joda.time.format.DateTimeFormatter)
> /** For parsing JSON number fields containing milliseconds since UNIX epoch 
> that are listed as DATETIME in the target schema*/
> JsonToRow::usingUnixMillis()
> /** For parsing JSON number fields containing seconds since UNIX epoch that 
> are listed as DATETIME in the target schema*/
> JsonToRow::usingUnixSeconds()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-10473) RowJson should support DATETIME

2020-07-13 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-10473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero updated BEAM-10473:
--
Summary: RowJson should support DATETIME  (was: JsonToRow should support 
DATETIME)

> RowJson should support DATETIME
> ---
>
> Key: BEAM-10473
> URL: https://issues.apache.org/jira/browse/BEAM-10473
> Project: Beam
>  Issue Type: New Feature
>  Components: sdk-java-core
>Reporter: Jacob Ferriero
>Priority: P2
>
> Add support for DATETIME types by adding an 
>  interface to JsonToRow that accepts a joda DateTimeFormatter for parsing 
> DateTime JSON string fields or specifies a behavior for treating numbers as 
> unix timestamps.
> The interface might look something lik this:
> {code:java}
> /** For parsing JSON string fields containing dates to DATETIME in the target 
> schema */
> JsonToRow::withDateTimeFormatter(org.joda.time.format.DateTimeFormatter)
> /** For parsing JSON number fields containing milliseconds since UNIX epoch 
> that are listed as DATETIME in the target schema*/
> JsonToRow::usingUnixMillis()
> /** For parsing JSON number fields containing seconds since UNIX epoch that 
> are listed as DATETIME in the target schema*/
> JsonToRow::usingUnixSeconds()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-10473) JsonToRow should support DATETIME

2020-07-13 Thread Jacob Ferriero (Jira)
Jacob Ferriero created BEAM-10473:
-

 Summary: JsonToRow should support DATETIME
 Key: BEAM-10473
 URL: https://issues.apache.org/jira/browse/BEAM-10473
 Project: Beam
  Issue Type: New Feature
  Components: sdk-java-core
Reporter: Jacob Ferriero


Add support for DATETIME types by adding an 
 interface to JsonToRow that accepts a joda DateTimeFormatter for parsing 
DateTime JSON string fields or specifies a behavior for treating numbers as 
unix timestamps.

The interface might look something lik this:
{code:java}
/** For parsing JSON string fields containing dates to DATETIME in the target 
schema */
JsonToRow::withDateTimeFormatter(org.joda.time.format.DateTimeFormatter)

/** For parsing JSON number fields containing milliseconds since UNIX epoch 
that are listed as DATETIME in the target schema*/
JsonToRow::usingUnixMillis()

/** For parsing JSON number fields containing seconds since UNIX epoch that are 
listed as DATETIME in the target schema*/
JsonToRow::usingUnixSeconds()
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-10437) Add zeta SQL GREATEST support

2020-07-09 Thread Jacob Ferriero (Jira)
Jacob Ferriero created BEAM-10437:
-

 Summary: Add zeta SQL GREATEST support
 Key: BEAM-10437
 URL: https://issues.apache.org/jira/browse/BEAM-10437
 Project: Beam
  Issue Type: New Feature
  Components: dsl-sql-zetasql
Reporter: Jacob Ferriero






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-10436) Add zeta SQL LOG / LOG10 support

2020-07-09 Thread Jacob Ferriero (Jira)
Jacob Ferriero created BEAM-10436:
-

 Summary: Add zeta SQL LOG / LOG10 support
 Key: BEAM-10436
 URL: https://issues.apache.org/jira/browse/BEAM-10436
 Project: Beam
  Issue Type: New Feature
  Components: dsl-sql-zetasql
Reporter: Jacob Ferriero






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-10419) Flaky FhirIOWriteIT integraion test in java postcommit

2020-07-07 Thread Jacob Ferriero (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-10419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17153159#comment-17153159
 ] 

Jacob Ferriero commented on BEAM-10419:
---

Seems that the logic that moves files to "claim" them for a batch import in 
FhirIO.Import sometimes is trying to copy a file that has already been deleted.

For sake of explanation (and refreshing my own memory) the FhirIO.Import works 
as follows:
# Buffer elements in a bundle to a new line delimited JSON file in the tempDir
# Group these files into batches (of 10,000 files) this is to avoid many load 
jobs if batches were small
# Copy each batch of files to a unique sub temp dir lets call it a batchDir 
(this is used to specify a prefix + wildcard for each load job) in 
[ImportFn|https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java#L1033-L1040]
# Start load job and block til completion (deletes the files in the batchDir 
[here|https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java#L1051]
 and the corresponding files in the orignal dir 
[here|https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java#L1065])
# Delete all the (original) files in the tempDir once the window is closed 
(based on this [usage of 
Wait.on|https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java#L840-L870])

Background on why this logic was so complicated in this review 
[thread|https://github.com/apache/beam/pull/11339#discussion_r423357007]

The flakiness seems to occur when attempting the copy in step 3 on a file that 
does not exists. My hunch is that the deleting in step 5 may not be properly 
waiting on the ImportFn to be complete.

Could this happen if the ImportFn task is rescheduled for a particular batch?

FYI I do not have bandwidth to dig deeper into this / fix this right now and 
[~lastomato] has taken over maintenance of these IO connectors as I am not 
currently engaged w/ Healthcare customers.

{noformat}
Caused by: java.io.IOException: Error executing batch GCS request
at 
org.apache.beam.sdk.extensions.gcp.util.GcsUtil.executeBatches(GcsUtil.java:617)
at 
org.apache.beam.sdk.extensions.gcp.util.GcsUtil.copy(GcsUtil.java:718)
at 
org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystem.copy(GcsFileSystem.java:168)
at org.apache.beam.sdk.io.FileSystems.copy(FileSystems.java:289)
at 
org.apache.beam.sdk.io.gcp.healthcare.FhirIO$Import$ImportFn.importBatch(FhirIO.java:1040)
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Error 
trying to rewrite 
gs://temp-storage-for-healthcare-io-tests/fhirImportBatch-60962352-ef3c-4d42-8079-c17ac430ebd6.ndjson
 to 
gs://temp-storage-for-healthcare-io-tests/tmp-22b3fd94-1455-42b0-b24a-55439eff5647/fhirImportBatch-60962352-ef3c-4d42-8079-c17ac430ebd6.ndjson:
 {"code":404,"errors":[{"domain":"global","message":"No such object: 
temp-storage-for-healthcare-io-tests/fhirImportBatch-60962352-ef3c-4d42-8079-c17ac430ebd6.ndjson","reason":"notFound"}],"message":"No
 such object: 
temp-storage-for-healthcare-io-tests/fhirImportBatch-60962352-ef3c-4d42-8079-c17ac430ebd6.ndjson"}
{noformat}


> Flaky FhirIOWriteIT integraion test in java postcommit
> --
>
> Key: BEAM-10419
> URL: https://issues.apache.org/jira/browse/BEAM-10419
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Yichi Zhang
>Assignee: Jacob Ferriero
>Priority: P1
>  Labels: flake
>
> See history 
> https://ci-beam.apache.org/job/beam_PostCommit_Java/6320/testReport/junit/org.apache.beam.sdk.io.gcp.healthcare/FhirIOWriteIT/history/?start=25



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-10394) Add zeta SQL SQRT support

2020-06-30 Thread Jacob Ferriero (Jira)
Jacob Ferriero created BEAM-10394:
-

 Summary: Add zeta SQL SQRT support
 Key: BEAM-10394
 URL: https://issues.apache.org/jira/browse/BEAM-10394
 Project: Beam
  Issue Type: New Feature
  Components: dsl-sql-zetasql
Reporter: Jacob Ferriero






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-10393) Add zeta SQL POW support

2020-06-30 Thread Jacob Ferriero (Jira)
Jacob Ferriero created BEAM-10393:
-

 Summary: Add zeta SQL POW support
 Key: BEAM-10393
 URL: https://issues.apache.org/jira/browse/BEAM-10393
 Project: Beam
  Issue Type: New Feature
  Components: dsl-sql-zetasql
Reporter: Jacob Ferriero






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-10061) ReadAllFromTextWithFilename

2020-06-04 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-10061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero updated BEAM-10061:
--
Component/s: (was: io-py-gcp)
 sdk-java-core
 io-py-files
 io-java-files

> ReadAllFromTextWithFilename
> ---
>
> Key: BEAM-10061
> URL: https://issues.apache.org/jira/browse/BEAM-10061
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-files, io-py-files, sdk-java-core, sdk-py-core
> Environment: Dataflow with Python
>Reporter: Ryan Canty
>Priority: P2
>
> I am trying to create a job that reads from GCS executes some code against 
> each line and creates a PCollection with the line and the file. So basically 
> what I'd like is a combination of textio.ReadTextWithFilename and 
> textio.ReadAllFromText



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-10061) ReadAllFromTextWithFilename

2020-06-04 Thread Jacob Ferriero (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-10061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126295#comment-17126295
 ] 

Jacob Ferriero commented on BEAM-10061:
---

This would also be useful in java sdk as that's what I typically see customers 
using. (added java components to this issue)

> ReadAllFromTextWithFilename
> ---
>
> Key: BEAM-10061
> URL: https://issues.apache.org/jira/browse/BEAM-10061
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-files, io-py-files, sdk-java-core, sdk-py-core
> Environment: Dataflow with Python
>Reporter: Ryan Canty
>Priority: P2
>
> I am trying to create a job that reads from GCS executes some code against 
> each line and creates a PCollection with the line and the file. So basically 
> what I'd like is a combination of textio.ReadTextWithFilename and 
> textio.ReadAllFromText



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-10175) FhirIO execute bundle uses deprecated auth uri param

2020-06-02 Thread Jacob Ferriero (Jira)
Jacob Ferriero created BEAM-10175:
-

 Summary: FhirIO execute bundle uses deprecated auth uri param
 Key: BEAM-10175
 URL: https://issues.apache.org/jira/browse/BEAM-10175
 Project: Beam
  Issue Type: Improvement
  Components: io-java-gcp
Affects Versions: 2.22.0
Reporter: Jacob Ferriero
Assignee: Jacob Ferriero






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (BEAM-10141) HL7v2IO read methods should assign sendTime timestamps

2020-05-29 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-10141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero reassigned BEAM-10141:
-

Assignee: Jacob Ferriero

> HL7v2IO read methods should assign sendTime timestamps
> --
>
> Key: BEAM-10141
> URL: https://issues.apache.org/jira/browse/BEAM-10141
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: P3
>
> Per Suggestion in this 
> [conversation|https://github.com/apache/beam/pull/11596#discussion_r427633240]
> Add timestamped values and watermark estimate to
>  HL7v2IO.ListMessages. The same argument can be made for making HL7v2IO.Read 
> return timestamped values.
> This should be optional and default to false to not be interface breaking.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-10141) HL7v2IO read methods should assign sendTime timestamps

2020-05-28 Thread Jacob Ferriero (Jira)
Jacob Ferriero created BEAM-10141:
-

 Summary: HL7v2IO read methods should assign sendTime timestamps
 Key: BEAM-10141
 URL: https://issues.apache.org/jira/browse/BEAM-10141
 Project: Beam
  Issue Type: Improvement
  Components: io-java-gcp
Reporter: Jacob Ferriero


Per Suggestion in this 
[conversation|https://github.com/apache/beam/pull/11596#discussion_r427633240]
Add timestamped values and watermark estimate to
 HL7v2IO.ListMessages. The same argument can be made for making HL7v2IO.Read 
return timestamped values.

This should be optional and default to false to not be interface breaking.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization

2020-05-21 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero resolved BEAM-9856.
--
Fix Version/s: 2.22.0
   Resolution: Fixed

> HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
> --
>
> Key: BEAM-9856
> URL: https://issues.apache.org/jira/browse/BEAM-9856
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: P3
> Fix For: 2.22.0
>
>  Time Spent: 10.5h
>  Remaining Estimate: 0h
>
> Currently the List Messages API paginates through in a single ProcessElement 
> Call.
> However we could get a restriction based on createTime using Messages.List 
> filter and orderby.
>  
> This is inline with the future roadmap of  HL7v2 bulk export API becomes 
> available that should allow splitting on (e.g. create time dimension). 
> Leveraging this bulk export might be  a future optimization to explore.
>  
> This could take one of two forms:
> 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make 
> optimization the runner's problem, potentially unnecessarily complex for this 
> use case )
> 2. static splitting on some time partition e.g. finding the earliest 
> createTime and emitting a PCollection of 1 hour partitions and paginating 
> through each hour of data w/ in the time frame that the store spans, in a 
> separate ProcessElement. (easy to implement but will likely have hot keys / 
> stragglers based on "busy hours")
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (BEAM-9831) HL7v2IO Improvements

2020-05-21 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero resolved BEAM-9831.
--
Fix Version/s: 2.22.0
   Resolution: Fixed

> HL7v2IO Improvements
> 
>
> Key: BEAM-9831
> URL: https://issues.apache.org/jira/browse/BEAM-9831
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: P2
> Fix For: 2.22.0
>
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> # HL7v2MessageCoder constructor should be public for use by end users
>  # Currently HL7v2IO.ListHL7v2Messages blocks on pagination through list 
> messages results before emitting any output data elements (due to high fan 
> out from a single input element). We should add early firings so that 
> downstream processing can proceed on early pages while later pages are still 
> being scrolled through.
>  # We should drop all output only fields of HL7v2Message and only keep data 
> and labels when calling ingestMessages, rather than expecting the user to do 
> this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (BEAM-9468) Add Google Cloud Healthcare API IO Connectors

2020-05-21 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero resolved BEAM-9468.
--
Resolution: Fixed

> Add Google Cloud Healthcare API IO Connectors
> -
>
> Key: BEAM-9468
> URL: https://issues.apache.org/jira/browse/BEAM-9468
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: P3
> Fix For: 2.22.0
>
>  Time Spent: 53.5h
>  Remaining Estimate: 0h
>
> Add IO Transforms for the HL7v2, FHIR and DICOM stores in the [Google Cloud 
> Healthcare API|https://cloud.google.com/healthcare/docs/]
> HL7v2IO
> FHIRIO
> DICOM 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-9468) Add Google Cloud Healthcare API IO Connectors

2020-05-21 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero updated BEAM-9468:
-
Fix Version/s: 2.22.0

> Add Google Cloud Healthcare API IO Connectors
> -
>
> Key: BEAM-9468
> URL: https://issues.apache.org/jira/browse/BEAM-9468
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: P3
> Fix For: 2.22.0
>
>  Time Spent: 53.5h
>  Remaining Estimate: 0h
>
> Add IO Transforms for the HL7v2, FHIR and DICOM stores in the [Google Cloud 
> Healthcare API|https://cloud.google.com/healthcare/docs/]
> HL7v2IO
> FHIRIO
> DICOM 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-9468) Add Google Cloud Healthcare API IO Connectors

2020-05-21 Thread Jacob Ferriero (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17113411#comment-17113411
 ] 

Jacob Ferriero commented on BEAM-9468:
--

Yes!

> Add Google Cloud Healthcare API IO Connectors
> -
>
> Key: BEAM-9468
> URL: https://issues.apache.org/jira/browse/BEAM-9468
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: P3
>  Time Spent: 53.5h
>  Remaining Estimate: 0h
>
> Add IO Transforms for the HL7v2, FHIR and DICOM stores in the [Google Cloud 
> Healthcare API|https://cloud.google.com/healthcare/docs/]
> HL7v2IO
> FHIRIO
> DICOM 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-10040) TestPubsubSignal not signalling success w/ Dataflow Runner

2020-05-19 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-10040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero updated BEAM-10040:
--
Issue Type: Bug  (was: Improvement)

> TestPubsubSignal not signalling success w/ Dataflow Runner
> --
>
> Key: BEAM-10040
> URL: https://issues.apache.org/jira/browse/BEAM-10040
> Project: Beam
>  Issue Type: Bug
>  Components: test-failures
>Reporter: Jacob Ferriero
>Priority: P2
>
> The issue with FhirIOReadIT seems to be some misuse of TestPubsubSignal
> [Example 
> Job|https://pantheon.corp.google.com/dataflow/jobs/us-central1/2020-05-19_19_05_19-1057064265214622054?project=apache-beam-testing]
>  clearly has the expected >2000 elements added to the "waitForAnyMessage" task
> but the success signal never gets published to the results topic.
> Notably there are job level warnings about metric descriptors and [warnings 
> in shuffle 
> logs|https://pantheon.corp.google.com/logs/viewer?dateRangeEnd=2020-05-20T02:18:25.642Z&dateRangeStart=2020-05-20T02:05:21.347Z&expandAll=false&interval=CUSTOM&project=apache-beam-testing&resource=dataflow_step%2Fjob_id%2F2020-05-19_19_05_19-1057064265214622054×tamp=2020-05-20T02:57:46.18700Z&logName=projects%2Fapache-beam-testing%2Flogs%2Fdataflow.googleapis.com%252Fshuffler&minLogLevel=0&customFacets=&limitCustomFacetWidth=true&scrollTimestamp=2020-05-20T02:15:27.999165390Z]
>  which warns:
> "Update range task returned 'invalid argument'. Assuming lost lease for work 
> with id 5061980071068333770 (expiration time: 1589940982000, now: 
> 1589940923590, full status: INVALID_ARGUMENT: Http(400) Bad Request). For 
> more information, see 
> https://cloud.google.com/dataflow/docs/guides/common-errors.";



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-10040) TestPubsubSignal not signalling success w/ Dataflow Runner

2020-05-19 Thread Jacob Ferriero (Jira)
Jacob Ferriero created BEAM-10040:
-

 Summary: TestPubsubSignal not signalling success w/ Dataflow Runner
 Key: BEAM-10040
 URL: https://issues.apache.org/jira/browse/BEAM-10040
 Project: Beam
  Issue Type: Improvement
  Components: test-failures
Reporter: Jacob Ferriero


The issue with FhirIOReadIT seems to be some misuse of TestPubsubSignal
[Example 
Job|https://pantheon.corp.google.com/dataflow/jobs/us-central1/2020-05-19_19_05_19-1057064265214622054?project=apache-beam-testing]
 clearly has the expected >2000 elements added to the "waitForAnyMessage" task
but the success signal never gets published to the results topic.

Notably there are job level warnings about metric descriptors and [warnings in 
shuffle 
logs|https://pantheon.corp.google.com/logs/viewer?dateRangeEnd=2020-05-20T02:18:25.642Z&dateRangeStart=2020-05-20T02:05:21.347Z&expandAll=false&interval=CUSTOM&project=apache-beam-testing&resource=dataflow_step%2Fjob_id%2F2020-05-19_19_05_19-1057064265214622054×tamp=2020-05-20T02:57:46.18700Z&logName=projects%2Fapache-beam-testing%2Flogs%2Fdataflow.googleapis.com%252Fshuffler&minLogLevel=0&customFacets=&limitCustomFacetWidth=true&scrollTimestamp=2020-05-20T02:15:27.999165390Z]
 which warns:

"Update range task returned 'invalid argument'. Assuming lost lease for work 
with id 5061980071068333770 (expiration time: 1589940982000, now: 
1589940923590, full status: INVALID_ARGUMENT: Http(400) Bad Request). For more 
information, see https://cloud.google.com/dataflow/docs/guides/common-errors.";



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-9990) FhirIO should support conditional create / update methods

2020-05-13 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero updated BEAM-9990:
-
Description: 
There are many use cases where it is expected that calling executeBundles in a 
distributed environment may fail (e.g. trying to create a resource that already 
exists). 
We should add classes to support the following methods as implementations of 
FhirIO.Write to provide more robust reconciliation strategies for Dead Letter 
Queues involving FhirIO.Write

https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/conditionalUpdate
https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create
https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/update

> FhirIO should support conditional create / update methods
> -
>
> Key: BEAM-9990
> URL: https://issues.apache.org/jira/browse/BEAM-9990
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: Major
>
> There are many use cases where it is expected that calling executeBundles in 
> a distributed environment may fail (e.g. trying to create a resource that 
> already exists). 
> We should add classes to support the following methods as implementations of 
> FhirIO.Write to provide more robust reconciliation strategies for Dead Letter 
> Queues involving FhirIO.Write
> https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/conditionalUpdate
> https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/create
> https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/update



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-9990) FhirIO should support conditional create / update methods

2020-05-13 Thread Jacob Ferriero (Jira)
Jacob Ferriero created BEAM-9990:


 Summary: FhirIO should support conditional create / update methods
 Key: BEAM-9990
 URL: https://issues.apache.org/jira/browse/BEAM-9990
 Project: Beam
  Issue Type: Improvement
  Components: io-java-gcp
Reporter: Jacob Ferriero
Assignee: Jacob Ferriero






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-9779) HL7v2IOWriteIT is flaky

2020-05-04 Thread Jacob Ferriero (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099525#comment-17099525
 ] 

Jacob Ferriero commented on BEAM-9779:
--

[~chamikara] Has the changes in https://github.com/apache/beam/pull/11450 
stabilized this test? should we close this?

> HL7v2IOWriteIT is flaky
> ---
>
> Key: BEAM-9779
> URL: https://issues.apache.org/jira/browse/BEAM-9779
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp, test-failures
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: Critical
>  Time Spent: 4h 10m
>  Remaining Estimate: 0h
>
> There seems to be a race condition somewhere in HL7v2IOWriteIT that causes 
> flakiness.
> https://builds.apache.org/job/beam_PostCommit_Java/5947/
> https://builds.apache.org/job/beam_PostCommit_Java/5943/
> https://builds.apache.org/job/beam_PostCommit_Java/5942/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (BEAM-9847) Verify If Triggering allows emitting eager results when processing a single element in HL7v2IO.

2020-05-01 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero resolved BEAM-9847.
--
Fix Version/s: Not applicable
   Resolution: Not A Bug

> Verify If Triggering allows emitting eager results when processing a single 
> element in HL7v2IO.
> ---
>
> Key: BEAM-9847
> URL: https://issues.apache.org/jira/browse/BEAM-9847
> Project: Beam
>  Issue Type: Task
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: Minor
> Fix For: Not applicable
>
>
> Due to the nature of the HL7v2 API, HL7v2IO.ListHL7v2Messages follows the 
> pattern of [paginating through all ListMessages results in a single 
> ProcessElement call.|#diff-9cd595f078378218ccc01ce7e19ca766R447]]
>  
> Upon testing with customer against HL7v2 store with 350k messages  we 
> observed that the ListMessages transform was not outputting any elements 
> "hanging" for a long time (which was assumed to be the single thread 
> paginating through all the results).
>  
> We added the following triggering in hopes that it would emit early results:
> {code:java}
>   .apply(
>   Window.into(new GlobalWindows())
>   .triggering(
>   AfterWatermark.pastEndOfWindow()
>   .withEarlyFirings(
>   AfterProcessingTime.pastFirstElementInPane()
>   .plusDelayOf(Duration.standardSeconds(1
>   .discardingFiredPanes())
> {code}
> Our tests with this triggering seemed to indicate that it did not "hang" like 
> the first test and seemed to output a more steady stream of elements.
> Reviewer states that bundles must be committed atomically so no output 
> elements of a (single process element call) can proceed to downstream stages 
> until all output elements for that process element call are ready.
> There may be other things at play here. Will seek to reproduce in a way that 
> definitively confirms output elements can be eagerly output during the 
> execution of a single process element call before it completes.
> CC: [~pabloem]
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization

2020-05-01 Thread Jacob Ferriero (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17097785#comment-17097785
 ] 

Jacob Ferriero commented on BEAM-9856:
--

I have begun work on a POC for what this would look like as splittable DoFn in 
#11596.

> HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
> --
>
> Key: BEAM-9856
> URL: https://issues.apache.org/jira/browse/BEAM-9856
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: Minor
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Currently the List Messages API paginates through in a single ProcessElement 
> Call.
> However we could get a restriction based on createTime using Messages.List 
> filter and orderby.
>  
> This is inline with the future roadmap of  HL7v2 bulk export API becomes 
> available that should allow splitting on (e.g. create time dimension). 
> Leveraging this bulk export might be  a future optimization to explore.
>  
> This could take one of two forms:
> 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make 
> optimization the runner's problem, potentially unnecessarily complex for this 
> use case )
> 2. static splitting on some time partition e.g. finding the earliest 
> createTime and emitting a PCollection of 1 hour partitions and paginating 
> through each hour of data w/ in the time frame that the store spans, in a 
> separate ProcessElement. (easy to implement but will likely have hot keys / 
> stragglers based on "busy hours")
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization

2020-04-30 Thread Jacob Ferriero (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17097008#comment-17097008
 ] 

Jacob Ferriero edited comment on BEAM-9856 at 4/30/20, 9:59 PM:


Additional Consideration:
The current ListMessages implementation allows a user to specify a filter. If 
we are leveraging filters in our sharding approach we should use AND to respect 
the user's filter as well. This could potentially lead to to wasting queries 
due to disjoint filtering on sendTime.


was (Author: data-runner0):
Additional Consideration:
The current ListMessages implementation allows a user to specify a filter. If 
we are leveraging filters in our sharding approach we should use AND to respect 
the user's filter as well. This could potentially lead to to wasting queries 
due to disjoint filtering on createTime.

> HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
> --
>
> Key: BEAM-9856
> URL: https://issues.apache.org/jira/browse/BEAM-9856
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: Minor
>
> Currently the List Messages API paginates through in a single ProcessElement 
> Call.
> However we could get a restriction based on createTime using Messages.List 
> filter and orderby.
>  
> This is inline with the future roadmap of  HL7v2 bulk export API becomes 
> available that should allow splitting on (e.g. create time dimension). 
> Leveraging this bulk export might be  a future optimization to explore.
>  
> This could take one of two forms:
> 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make 
> optimization the runner's problem, potentially unnecessarily complex for this 
> use case )
> 2. static splitting on some time partition e.g. finding the earliest 
> createTime and emitting a PCollection of 1 hour partitions and paginating 
> through each hour of data w/ in the time frame that the store spans, in a 
> separate ProcessElement. (easy to implement but will likely have hot keys / 
> stragglers based on "busy hours")
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization

2020-04-30 Thread Jacob Ferriero (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17096961#comment-17096961
 ] 

Jacob Ferriero edited comment on BEAM-9856 at 4/30/20, 9:59 PM:


The existing GA [HL7v2 Messages.List 
API|https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages/list]
 allows us to specify a filter and order by sendTime. We should be able to use 
this as our restriction dimension to make this a splitable DoFn.

I will investigate feasibility of this.


Basic Design Proposal:
Each Messages.List query will have a sendTime filter based on it's restriction 
and orderBy sendTime in order closely mimic the 
[OffsetRangeTracker|https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.html]
 pattern.

* getInitialRestriction: run a query against HL7v2 store to get earliest 
sendTime 
* RestrictionTracker: keep a "watermark" based on sendTime
* splitRestriction: split timestamp range based on fraction (this will require 
an existing HL7v2 Message List call can be notified to "stop paginating through 
the at an arbitrary sendTime". Note that each List query might not get to the 
end of it's sendTime filter (due to splitting). Instead it should just spot 
processing results when it encounters are record past the end of it's 
restriction.

This should allow us to more eagerly emit results as certain restrictions are 
completed.


Open Questions:
* Is there a canonical way of specifying more than one initial restriction 
based on assumption that you know for most use cases you'll want to split at 
least n-times upfront (e.g. partition by day/hour to start and dynamically 
split from there) ? Is this an anti-pattern because
* Should the initial restriction have a endTime? Should this be an optional 
user parameter for the transform? What should the default be (e.g. 
Instant.now() just before firing the first query)? or Should the ListMessages 
just scroll until it is "caught up" there are no newer messages?


Jake's two cents:
I consider this List Messages transform to primarily serve a batch / bounded 
backfill or replay use case. I believe real-time use cases should use the Pub 
Sub notifications for event driven / streaming updates from the HL7v2 store 
with HL7v2IO.readAll() (as this allows for much greater parallelization and is 
more likely to "keep up" during higher throughput). However, there is nothing 
stopping a user from using ListMessages against a store that is still being 
updated. If this becomes a splittable DoFn that fires many ListMessages 
throughtout it's life time and our initial restriction is [minSendTime,  inf), 
this could become an unbounded source (or more specifically unbounded per 
element). I feel we should force ListMessages to be bounded per element by 
always having a endSendTime.


was (Author: data-runner0):
The existing GA [HL7v2 Messages.List 
API|https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages/list]
 allows us to specify a filter and order by createTime. We should be able to 
use this as our restriction dimension to make this a splitable DoFn.

I will investigate feasibility of this.


Basic Design Proposal:
Each Messages.List query will have a createTime filter based on it's 
restriction and orderBy createTime in order closely mimic the 
[OffsetRangeTracker|https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.html]
 pattern.

* getInitialRestriction: run a query against HL7v2 store to get earliest 
createTime 
* RestrictionTracker: keep a "watermark" based on createTime
* splitRestriction: split timestamp range based on fraction (this will require 
an existing HL7v2 Message List call can be notified to "stop paginating through 
the at an arbitrary createTime". Note that each List query might not get to the 
end of it's createTime filter (due to splitting). Instead it should just spot 
processing results when it encounters are record past the end of it's 
restriction.

This should allow us to more eagerly emit results as certain restrictions are 
completed.


Open Questions:
* Is there a canonical way of specifying more than one initial restriction 
based on assumption that you know for most use cases you'll want to split at 
least n-times upfront (e.g. partition by day/hour to start and dynamically 
split from there) ? Is this an anti-pattern because
* Should the initial restriction have a endTime? Should this be an optional 
user parameter for the transform? What should the default be (e.g. 
Instant.now() just before firing the first query)? or Should the ListMessages 
just scroll until it is "caught up" there are no newer messages?


Jake's two cents:
I consider this List Messages transform to primarily serv

[jira] [Commented] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization

2020-04-30 Thread Jacob Ferriero (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17097008#comment-17097008
 ] 

Jacob Ferriero commented on BEAM-9856:
--

Additional Consideration:
The current ListMessages implementation allows a user to specify a filter. If 
we are leveraging filters in our sharding approach we should use AND to respect 
the user's filter as well. This could potentially lead to to wasting queries 
due to disjoint filtering on createTime.

> HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
> --
>
> Key: BEAM-9856
> URL: https://issues.apache.org/jira/browse/BEAM-9856
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: Minor
>
> Currently the List Messages API paginates through in a single ProcessElement 
> Call.
> However we could get a restriction based on createTime using Messages.List 
> filter and orderby.
>  
> This is inline with the future roadmap of  HL7v2 bulk export API becomes 
> available that should allow splitting on (e.g. create time dimension). 
> Leveraging this bulk export might be  a future optimization to explore.
>  
> This could take one of two forms:
> 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make 
> optimization the runner's problem, potentially unnecessarily complex for this 
> use case )
> 2. static splitting on some time partition e.g. finding the earliest 
> createTime and emitting a PCollection of 1 hour partitions and paginating 
> through each hour of data w/ in the time frame that the store spans, in a 
> separate ProcessElement. (easy to implement but will likely have hot keys / 
> stragglers based on "busy hours")
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization

2020-04-30 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero updated BEAM-9856:
-
Description: 
Currently the List Messages API paginates through in a single ProcessElement 
Call.
However we could get a restriction based on createTime using Messages.List 
filter and orderby.
 

This is inline with the future roadmap of  HL7v2 bulk export API becomes 
available that should allow splitting on (e.g. create time dimension). 
Leveraging this bulk export might be  a future optimization to explore.
 

This could take one of two forms:
1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make 
optimization the runner's problem, potentially unnecessarily complex for this 
use case )
2. static splitting on some time partition e.g. finding the earliest createTime 
and emitting a PCollection of 1 hour partitions and paginating through each 
hour of data w/ in the time frame that the store spans, in a separate 
ProcessElement. (easy to implement but will likely have hot keys / stragglers 
based on "busy hours")

 

  was:
Currently the List Messages API paginates through in a single ProcessElement 
Call.
However we could get a restriction based on createTime using Messages.List 
filter and orderby.
 

This is inline with the future roadmap of  HL7v2 bulk export API becomes 
available that should allow splitting on (e.g. create time dimension). 
Leveraging this bulk export might be  a future optimization to explore.
 

This could take one of two forms:
1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make 
optimization the runner's problem, potentially unnecessarily complex for this 
use case )
1. static splitting on some time partition e.g. finding the earliest createTime 
and emitting a PCollection of 1 hour partitions and paginating through each 
hour of data w/ in the time frame that the store spans, in a separate 
ProcessElement. (easy to implement but will likely have hot keys / stragglers 
based on "busy hours")

 


> HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
> --
>
> Key: BEAM-9856
> URL: https://issues.apache.org/jira/browse/BEAM-9856
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: Minor
>
> Currently the List Messages API paginates through in a single ProcessElement 
> Call.
> However we could get a restriction based on createTime using Messages.List 
> filter and orderby.
>  
> This is inline with the future roadmap of  HL7v2 bulk export API becomes 
> available that should allow splitting on (e.g. create time dimension). 
> Leveraging this bulk export might be  a future optimization to explore.
>  
> This could take one of two forms:
> 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make 
> optimization the runner's problem, potentially unnecessarily complex for this 
> use case )
> 2. static splitting on some time partition e.g. finding the earliest 
> createTime and emitting a PCollection of 1 hour partitions and paginating 
> through each hour of data w/ in the time frame that the store spans, in a 
> separate ProcessElement. (easy to implement but will likely have hot keys / 
> stragglers based on "busy hours")
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization

2020-04-30 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero updated BEAM-9856:
-
Description: 
Currently the List Messages API paginates through in a single ProcessElement 
Call.
However we could get a restriction based on createTime using Messages.List 
filter and orderby.
 

This is inline with the future roadmap of  HL7v2 bulk export API becomes 
available that should allow splitting on (e.g. create time dimension). 
Leveraging this bulk export might be  a future optimization to explore.
 

This could take one of two forms:
1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make 
optimization the runner's problem, potentially unnecessarily complex for this 
use case )
1. static splitting on some time partition e.g. finding the earliest createTime 
and emitting a PCollection of 1 hour partitions and paginating through each 
hour of data w/ in the time frame that the store spans, in a separate 
ProcessElement. (easy to implement but will likely have hot keys / stragglers 
based on "busy hours")

 

  was:
Currently the List Messages API paginates through in a single ProcessElement 
Call.
However we could get a restriction based on createTime using Messages.List 
filter and orderby.
 

This is inline with the future roadmap of  HL7v2 bulk export API becomes 
available that should allow splitting on (e.g. create time dimension). 
Leveraging this bulk export might be  a future optimization to explore.
 

This could look like paginating through each hour of data w/ in the time frame 
that the store spans, in a separate thread.

 


> HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
> --
>
> Key: BEAM-9856
> URL: https://issues.apache.org/jira/browse/BEAM-9856
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: Minor
>
> Currently the List Messages API paginates through in a single ProcessElement 
> Call.
> However we could get a restriction based on createTime using Messages.List 
> filter and orderby.
>  
> This is inline with the future roadmap of  HL7v2 bulk export API becomes 
> available that should allow splitting on (e.g. create time dimension). 
> Leveraging this bulk export might be  a future optimization to explore.
>  
> This could take one of two forms:
> 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make 
> optimization the runner's problem, potentially unnecessarily complex for this 
> use case )
> 1. static splitting on some time partition e.g. finding the earliest 
> createTime and emitting a PCollection of 1 hour partitions and paginating 
> through each hour of data w/ in the time frame that the store spans, in a 
> separate ProcessElement. (easy to implement but will likely have hot keys / 
> stragglers based on "busy hours")
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored as splitable DoFn

2020-04-30 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero updated BEAM-9856:
-
Description: 
Currently the List Messages API paginates through in a single ProcessElement 
Call.
However we could get a restriction based on createTime using Messages.List 
filter and orderby.
 

This is inline with the future roadmap of  HL7v2 bulk export API becomes 
available that should allow splitting on (e.g. create time dimension). 
Leveraging this bulk export might be  a future optimization to explore.
 

This could look like paginating through each hour of data w/ in the time frame 
that the store spans, in a separate thread.

 

  was:
Currently the List Messages API paginates through in a single ProcessElement 
Call.

 

In the future if a bulk export API becomes available that would allow splitting 
on some dimension (e.g. create time), this should be refactored as a splittable 
DoFn or at least run several sub queries so that we are not listing an entire 
store in a single thread.

 

This could look like paginating through each hour of data w/ in the time frame 
that the store spans, in a separate thread.

 


> HL7v2IO.ListHL7v2Messages should be refactored as splitable DoFn
> 
>
> Key: BEAM-9856
> URL: https://issues.apache.org/jira/browse/BEAM-9856
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: Minor
>
> Currently the List Messages API paginates through in a single ProcessElement 
> Call.
> However we could get a restriction based on createTime using Messages.List 
> filter and orderby.
>  
> This is inline with the future roadmap of  HL7v2 bulk export API becomes 
> available that should allow splitting on (e.g. create time dimension). 
> Leveraging this bulk export might be  a future optimization to explore.
>  
> This could look like paginating through each hour of data w/ in the time 
> frame that the store spans, in a separate thread.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization

2020-04-30 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero updated BEAM-9856:
-
Summary: HL7v2IO.ListHL7v2Messages should be refactored to support more 
parallelization  (was: HL7v2IO.ListHL7v2Messages should be refactored as 
splitable DoFn)

> HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
> --
>
> Key: BEAM-9856
> URL: https://issues.apache.org/jira/browse/BEAM-9856
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: Minor
>
> Currently the List Messages API paginates through in a single ProcessElement 
> Call.
> However we could get a restriction based on createTime using Messages.List 
> filter and orderby.
>  
> This is inline with the future roadmap of  HL7v2 bulk export API becomes 
> available that should allow splitting on (e.g. create time dimension). 
> Leveraging this bulk export might be  a future optimization to explore.
>  
> This could look like paginating through each hour of data w/ in the time 
> frame that the store spans, in a separate thread.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored as splitable DoFn

2020-04-30 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero updated BEAM-9856:
-
Priority: Minor  (was: Major)

> HL7v2IO.ListHL7v2Messages should be refactored as splitable DoFn
> 
>
> Key: BEAM-9856
> URL: https://issues.apache.org/jira/browse/BEAM-9856
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: Minor
>
> Currently the List Messages API paginates through in a single ProcessElement 
> Call.
>  
> In the future if a bulk export API becomes available that would allow 
> splitting on some dimension (e.g. create time), this should be refactored as 
> a splittable DoFn or at least run several sub queries so that we are not 
> listing an entire store in a single thread.
>  
> This could look like paginating through each hour of data w/ in the time 
> frame that the store spans, in a separate thread.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored as splitable DoFn

2020-04-30 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero updated BEAM-9856:
-
Summary: HL7v2IO.ListHL7v2Messages should be refactored as splitable DoFn  
(was: HL7v2IO.ListHL7v2Messages should be refactored once Bulk Export API is 
available)

> HL7v2IO.ListHL7v2Messages should be refactored as splitable DoFn
> 
>
> Key: BEAM-9856
> URL: https://issues.apache.org/jira/browse/BEAM-9856
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: Major
>
> Currently the List Messages API paginates through in a single ProcessElement 
> Call.
>  
> In the future if a bulk export API becomes available that would allow 
> splitting on some dimension (e.g. create time), this should be refactored as 
> a splittable DoFn or at least run several sub queries so that we are not 
> listing an entire store in a single thread.
>  
> This could look like paginating through each hour of data w/ in the time 
> frame that the store spans, in a separate thread.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored once Bulk Export API is available

2020-04-30 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero updated BEAM-9856:
-
Summary: HL7v2IO.ListHL7v2Messages should be refactored once Bulk Export 
API is available  (was: HL7v2IO.ListMessages should be refactored once Bulk 
Export API is available)

> HL7v2IO.ListHL7v2Messages should be refactored once Bulk Export API is 
> available
> 
>
> Key: BEAM-9856
> URL: https://issues.apache.org/jira/browse/BEAM-9856
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: Major
>
> Currently the List Messages API paginates through in a single ProcessElement 
> Call.
>  
> In the future if a bulk export API becomes available that would allow 
> splitting on some dimension (e.g. create time), this should be refactored as 
> a splittable DoFn or at least run several sub queries so that we are not 
> listing an entire store in a single thread.
>  
> This could look like paginating through each hour of data w/ in the time 
> frame that the store spans, in a separate thread.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-9856) HL7v2IO.ListMessages should be refactored once Bulk Export API is available

2020-04-30 Thread Jacob Ferriero (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17096961#comment-17096961
 ] 

Jacob Ferriero commented on BEAM-9856:
--

The existing GA [HL7v2 Messages.List 
API|https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages/list]
 allows us to specify a filter and order by createTime. We should be able to 
use this as our restriction dimension to make this a splitable DoFn.

I will investigate feasibility of this.


Basic Design Proposal:
Each Messages.List query will have a createTime filter based on it's 
restriction and orderBy createTime in order closely mimic the 
[OffsetRangeTracker|https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.html]
 pattern.

* getInitialRestriction: run a query against HL7v2 store to get earliest 
createTime 
* RestrictionTracker: keep a "watermark" based on createTime
* splitRestriction: split timestamp range based on fraction (this will require 
an existing HL7v2 Message List call can be notified to "stop paginating through 
the at an arbitrary createTime". Note that each List query might not get to the 
end of it's createTime filter (due to splitting). Instead it should just spot 
processing results when it encounters are record past the end of it's 
restriction.

This should allow us to more eagerly emit results as certain restrictions are 
completed.


Open Questions:
* Is there a canonical way of specifying more than one initial restriction 
based on assumption that you know for most use cases you'll want to split at 
least n-times upfront (e.g. partition by day/hour to start and dynamically 
split from there) ? Is this an anti-pattern because
* Should the initial restriction have a endTime? Should this be an optional 
user parameter for the transform? What should the default be (e.g. 
Instant.now() just before firing the first query)? or Should the ListMessages 
just scroll until it is "caught up" there are no newer messages?


Jake's two cents:
I consider this List Messages transform to primarily serve a batch / bounded 
backfill or replay use case. I believe real-time use cases should use the Pub 
Sub notifications for event driven / streaming updates from the HL7v2 store 
with HL7v2IO.readAll() (as this allows for much greater parallelization and is 
more likely to "keep up" during higher throughput). However, there is nothing 
stopping a user from using ListMessages against a store that is still being 
updated. If this becomes a splittable DoFn that fires many ListMessages 
throughtout it's life time and our initial restriction is [minCreateTime,  
inf), this could become an unbounded source (or more specifically unbounded per 
element). I feel we should force ListMessages to be bounded per element by 
always having a endCreateTime.

> HL7v2IO.ListMessages should be refactored once Bulk Export API is available
> ---
>
> Key: BEAM-9856
> URL: https://issues.apache.org/jira/browse/BEAM-9856
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: Major
>
> Currently the List Messages API paginates through in a single ProcessElement 
> Call.
>  
> In the future if a bulk export API becomes available that would allow 
> splitting on some dimension (e.g. create time), this should be refactored as 
> a splittable DoFn or at least run several sub queries so that we are not 
> listing an entire store in a single thread.
>  
> This could look like paginating through each hour of data w/ in the time 
> frame that the store spans, in a separate thread.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (BEAM-9856) HL7v2IO.ListMessages should be refactored once Bulk Export API is available

2020-04-30 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero reassigned BEAM-9856:


Assignee: Jacob Ferriero

> HL7v2IO.ListMessages should be refactored once Bulk Export API is available
> ---
>
> Key: BEAM-9856
> URL: https://issues.apache.org/jira/browse/BEAM-9856
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: Major
>
> Currently the List Messages API paginates through in a single ProcessElement 
> Call.
>  
> In the future if a bulk export API becomes available that would allow 
> splitting on some dimension (e.g. create time), this should be refactored as 
> a splittable DoFn or at least run several sub queries so that we are not 
> listing an entire store in a single thread.
>  
> This could look like paginating through each hour of data w/ in the time 
> frame that the store spans, in a separate thread.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-9856) HL7v2IO.ListMessages should be refactored once Bulk Export API is available

2020-04-29 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero updated BEAM-9856:
-
Description: 
Currently the List Messages API paginates through in a single ProcessElement 
Call.

 

In the future if a bulk export API becomes available that would allow splitting 
on some dimension (e.g. create time), this should be refactored as a splittable 
DoFn or at least run several sub queries so that we are not listing an entire 
store in a single thread.

 

This could look like paginating through each hour of data w/ in the time frame 
that the store spans, in a separate thread.

 

  was:
Currently the List Messages API paginates through in a single ProcessElement 
Call.

 

In the future if a bulk export API becomes available that would allow splitting 
on some dimension (e.g. create time), this should be refactored as a splittable 
DoFn or at least run several sub queries so that we are not listing an entire 
store in a single thread.

 

 


> HL7v2IO.ListMessages should be refactored once Bulk Export API is available
> ---
>
> Key: BEAM-9856
> URL: https://issues.apache.org/jira/browse/BEAM-9856
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Priority: Major
>
> Currently the List Messages API paginates through in a single ProcessElement 
> Call.
>  
> In the future if a bulk export API becomes available that would allow 
> splitting on some dimension (e.g. create time), this should be refactored as 
> a splittable DoFn or at least run several sub queries so that we are not 
> listing an entire store in a single thread.
>  
> This could look like paginating through each hour of data w/ in the time 
> frame that the store spans, in a separate thread.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (BEAM-9856) HL7v2IO.ListMessages should be refactored once Bulk Export API is available

2020-04-29 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero updated BEAM-9856:
-
Description: 
Currently the List Messages API paginates through in a single ProcessElement 
Call.

 

In the future if a bulk export API becomes available that would allow splitting 
on some dimension (e.g. create time), this should be refactored as a splittable 
DoFn or at least run several sub queries so that we are not listing an entire 
store in a single thread.

 

 

  was:
Currently the List Messages API paginates through in a single ProcessElement 
Call.

 

In the future if a bulk export API becomes available that would allow splitting 
on some dimension (e.g. create time), this should be refactored as a splittable 
DoFn.

 

 


> HL7v2IO.ListMessages should be refactored once Bulk Export API is available
> ---
>
> Key: BEAM-9856
> URL: https://issues.apache.org/jira/browse/BEAM-9856
> Project: Beam
>  Issue Type: Improvement
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Priority: Major
>
> Currently the List Messages API paginates through in a single ProcessElement 
> Call.
>  
> In the future if a bulk export API becomes available that would allow 
> splitting on some dimension (e.g. create time), this should be refactored as 
> a splittable DoFn or at least run several sub queries so that we are not 
> listing an entire store in a single thread.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-9856) HL7v2IO.ListMessages should be refactored once Bulk Export API is available

2020-04-29 Thread Jacob Ferriero (Jira)
Jacob Ferriero created BEAM-9856:


 Summary: HL7v2IO.ListMessages should be refactored once Bulk 
Export API is available
 Key: BEAM-9856
 URL: https://issues.apache.org/jira/browse/BEAM-9856
 Project: Beam
  Issue Type: Improvement
  Components: io-java-gcp
Reporter: Jacob Ferriero


Currently the List Messages API paginates through in a single ProcessElement 
Call.

 

In the future if a bulk export API becomes available that would allow splitting 
on some dimension (e.g. create time), this should be refactored as a splittable 
DoFn.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-9847) Verify If Triggering allows emitting eager results when processing a single element in HL7v2IO.

2020-04-28 Thread Jacob Ferriero (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094947#comment-17094947
 ] 

Jacob Ferriero commented on BEAM-9847:
--

This test seems to confirm it is not possible to emit outputs of processElement 
call before it has completed.

[https://gist.github.com/jaketf/d3c2e70dde781bbb0ef1993446e34b71]

> Verify If Triggering allows emitting eager results when processing a single 
> element in HL7v2IO.
> ---
>
> Key: BEAM-9847
> URL: https://issues.apache.org/jira/browse/BEAM-9847
> Project: Beam
>  Issue Type: Task
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: Minor
>
> Due to the nature of the HL7v2 API, HL7v2IO.ListHL7v2Messages follows the 
> pattern of [paginating through all ListMessages results in a single 
> ProcessElement call.|#diff-9cd595f078378218ccc01ce7e19ca766R447]]
>  
> Upon testing with customer against HL7v2 store with 350k messages  we 
> observed that the ListMessages transform was not outputting any elements 
> "hanging" for a long time (which was assumed to be the single thread 
> paginating through all the results).
>  
> We added the following triggering in hopes that it would emit early results:
> {code:java}
>   .apply(
>   Window.into(new GlobalWindows())
>   .triggering(
>   AfterWatermark.pastEndOfWindow()
>   .withEarlyFirings(
>   AfterProcessingTime.pastFirstElementInPane()
>   .plusDelayOf(Duration.standardSeconds(1
>   .discardingFiredPanes())
> {code}
> Our tests with this triggering seemed to indicate that it did not "hang" like 
> the first test and seemed to output a more steady stream of elements.
> Reviewer states that bundles must be committed atomically so no output 
> elements of a (single process element call) can proceed to downstream stages 
> until all output elements for that process element call are ready.
> There may be other things at play here. Will seek to reproduce in a way that 
> definitively confirms output elements can be eagerly output during the 
> execution of a single process element call before it completes.
> CC: [~pabloem]
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-9847) Verify If Triggering allows emitting eager results when processing a single element in HL7v2IO.

2020-04-28 Thread Jacob Ferriero (Jira)
Jacob Ferriero created BEAM-9847:


 Summary: Verify If Triggering allows emitting eager results when 
processing a single element in HL7v2IO.
 Key: BEAM-9847
 URL: https://issues.apache.org/jira/browse/BEAM-9847
 Project: Beam
  Issue Type: Task
  Components: io-java-gcp
Reporter: Jacob Ferriero
Assignee: Jacob Ferriero


Due to the nature of the HL7v2 API, HL7v2IO.ListHL7v2Messages follows the 
pattern of [paginating through all ListMessages results in a single 
ProcessElement call.|#diff-9cd595f078378218ccc01ce7e19ca766R447]]

 

Upon testing with customer against HL7v2 store with 350k messages  we observed 
that the ListMessages transform was not outputting any elements "hanging" for a 
long time (which was assumed to be the single thread paginating through all the 
results).

 
We added the following triggering in hopes that it would emit early results:

{code:java}
  .apply(
  Window.into(new GlobalWindows())
  .triggering(
  AfterWatermark.pastEndOfWindow()
  .withEarlyFirings(
  AfterProcessingTime.pastFirstElementInPane()
  .plusDelayOf(Duration.standardSeconds(1
  .discardingFiredPanes())
{code}


Our tests with this triggering seemed to indicate that it did not "hang" like 
the first test and seemed to output a more steady stream of elements.

Reviewer states that bundles must be committed atomically so no output elements 
of a (single process element call) can proceed to downstream stages until all 
output elements for that process element call are ready.

There may be other things at play here. Will seek to reproduce in a way that 
definitively confirms output elements can be eagerly output during the 
execution of a single process element call before it completes.

CC: [~pabloem]

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (BEAM-9831) HL7v2IO Improvements

2020-04-27 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero reassigned BEAM-9831:


Assignee: Jacob Ferriero

> HL7v2IO Improvements
> 
>
> Key: BEAM-9831
> URL: https://issues.apache.org/jira/browse/BEAM-9831
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: Major
>
> # HL7v2MessageCoder constructor should be public for use by end users
>  # Currently HL7v2IO.ListHL7v2Messages blocks on pagination through list 
> messages results before emitting any output data elements (due to high fan 
> out from a single input element). We should add early firings so that 
> downstream processing can proceed on early pages while later pages are still 
> being scrolled through.
>  # We should drop all output only fields of HL7v2Message and only keep data 
> and labels when calling ingestMessages, rather than expecting the user to do 
> this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-9831) HL7v2IO Improvements

2020-04-27 Thread Jacob Ferriero (Jira)
Jacob Ferriero created BEAM-9831:


 Summary: HL7v2IO Improvements
 Key: BEAM-9831
 URL: https://issues.apache.org/jira/browse/BEAM-9831
 Project: Beam
  Issue Type: Bug
  Components: io-java-gcp
Reporter: Jacob Ferriero


# HL7v2MessageCoder constructor should be public for use by end users
 # Currently HL7v2IO.ListHL7v2Messages blocks on pagination through list 
messages results before emitting any output data elements (due to high fan out 
from a single input element). We should add early firings so that downstream 
processing can proceed on early pages while later pages are still being 
scrolled through.
 # We should drop all output only fields of HL7v2Message and only keep data and 
labels when calling ingestMessages, rather than expecting the user to do this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-9779) HL7v2IOWriteIT is flaky

2020-04-17 Thread Jacob Ferriero (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086063#comment-17086063
 ] 

Jacob Ferriero commented on BEAM-9779:
--

Working theory wasn't able to reproduce locally because I have higher latency 
to the HL7v2 store in us-central1 than the jenkins VMs. 

> HL7v2IOWriteIT is flaky
> ---
>
> Key: BEAM-9779
> URL: https://issues.apache.org/jira/browse/BEAM-9779
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: Minor
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> There seems to be a race condition somewhere in HL7v2IOWriteIT that causes 
> flakiness.
> https://builds.apache.org/job/beam_PostCommit_Java/5947/
> https://builds.apache.org/job/beam_PostCommit_Java/5943/
> https://builds.apache.org/job/beam_PostCommit_Java/5942/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (BEAM-9779) HL7v2IOWriteIT is flaky

2020-04-17 Thread Jacob Ferriero (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086057#comment-17086057
 ] 

Jacob Ferriero edited comment on BEAM-9779 at 4/17/20, 8:44 PM:


Seems that HL7v2 Store is indexed asynchronoulsy.
Will add a sleep for 5s to stabilize this test.


was (Author: data-runner0):
Seems that HL7v2 Store is indexed asynchronoulsy.
Will add a sleep for 10s to stabilize this test.

> HL7v2IOWriteIT is flaky
> ---
>
> Key: BEAM-9779
> URL: https://issues.apache.org/jira/browse/BEAM-9779
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: Minor
>
> There seems to be a race condition somewhere in HL7v2IOWriteIT that causes 
> flakiness.
> https://builds.apache.org/job/beam_PostCommit_Java/5947/
> https://builds.apache.org/job/beam_PostCommit_Java/5943/
> https://builds.apache.org/job/beam_PostCommit_Java/5942/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-9779) HL7v2IOWriteIT is flaky

2020-04-17 Thread Jacob Ferriero (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086057#comment-17086057
 ] 

Jacob Ferriero commented on BEAM-9779:
--

Seems that HL7v2 Store is indexed asynchronoulsy.
Will add a sleep for 10s to stabilize this test.

> HL7v2IOWriteIT is flaky
> ---
>
> Key: BEAM-9779
> URL: https://issues.apache.org/jira/browse/BEAM-9779
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: Minor
>
> There seems to be a race condition somewhere in HL7v2IOWriteIT that causes 
> flakiness.
> https://builds.apache.org/job/beam_PostCommit_Java/5947/
> https://builds.apache.org/job/beam_PostCommit_Java/5943/
> https://builds.apache.org/job/beam_PostCommit_Java/5942/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-9779) HL7v2IOWriteIT is flaky

2020-04-17 Thread Jacob Ferriero (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17086053#comment-17086053
 ] 

Jacob Ferriero commented on BEAM-9779:
--

I've been able to run this test many times locally and unable to reproduce.

It blocks on a pipeline run that writes messages to HL7v2 store and then lists 
messages to verify count in = count out.

The failure pattern seems to be  that intermittently less messages than 
expected are written to the HL7v2 Store.


> HL7v2IOWriteIT is flaky
> ---
>
> Key: BEAM-9779
> URL: https://issues.apache.org/jira/browse/BEAM-9779
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: Minor
>
> There seems to be a race condition somewhere in HL7v2IOWriteIT that causes 
> flakiness.
> https://builds.apache.org/job/beam_PostCommit_Java/5947/
> https://builds.apache.org/job/beam_PostCommit_Java/5943/
> https://builds.apache.org/job/beam_PostCommit_Java/5942/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (BEAM-9779) HL7v2IOWriteIT is flaky

2020-04-17 Thread Jacob Ferriero (Jira)


 [ 
https://issues.apache.org/jira/browse/BEAM-9779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacob Ferriero reassigned BEAM-9779:


Assignee: Jacob Ferriero

> HL7v2IOWriteIT is flaky
> ---
>
> Key: BEAM-9779
> URL: https://issues.apache.org/jira/browse/BEAM-9779
> Project: Beam
>  Issue Type: Bug
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Assignee: Jacob Ferriero
>Priority: Minor
>
> There seems to be a race condition somewhere in HL7v2IOWriteIT that causes 
> flakiness.
> https://builds.apache.org/job/beam_PostCommit_Java/5947/
> https://builds.apache.org/job/beam_PostCommit_Java/5943/
> https://builds.apache.org/job/beam_PostCommit_Java/5942/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-9779) HL7v2IOWriteIT is flaky

2020-04-17 Thread Jacob Ferriero (Jira)
Jacob Ferriero created BEAM-9779:


 Summary: HL7v2IOWriteIT is flaky
 Key: BEAM-9779
 URL: https://issues.apache.org/jira/browse/BEAM-9779
 Project: Beam
  Issue Type: Bug
  Components: io-java-gcp
Reporter: Jacob Ferriero


There seems to be a race condition somewhere in HL7v2IOWriteIT that causes 
flakiness.

https://builds.apache.org/job/beam_PostCommit_Java/5947/
https://builds.apache.org/job/beam_PostCommit_Java/5943/
https://builds.apache.org/job/beam_PostCommit_Java/5942/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-9468) Add Google Cloud Healthcare API IO Connectors

2020-03-11 Thread Jacob Ferriero (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17057511#comment-17057511
 ] 

Jacob Ferriero commented on BEAM-9468:
--

https://github.com/apache/beam/pull/11107

> Add Google Cloud Healthcare API IO Connectors
> -
>
> Key: BEAM-9468
> URL: https://issues.apache.org/jira/browse/BEAM-9468
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Priority: Minor
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Add IO Transforms for the HL7v2, FHIR and DICOM stores in the [Google Cloud 
> Healthcare API|https://cloud.google.com/healthcare/docs/]
> HL7v2IO
> FHIRIO
> DICOM 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-9468) Add Google Cloud Healthcare API IO Connectors

2020-03-10 Thread Jacob Ferriero (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17056127#comment-17056127
 ] 

Jacob Ferriero commented on BEAM-9468:
--

I am starting a prototype on this work. 
PR to come in a week or so.

> Add Google Cloud Healthcare API IO Connectors
> -
>
> Key: BEAM-9468
> URL: https://issues.apache.org/jira/browse/BEAM-9468
> Project: Beam
>  Issue Type: New Feature
>  Components: io-java-gcp
>Reporter: Jacob Ferriero
>Priority: Minor
>
> Add IO Transforms for the HL7v2, FHIR and DICOM stores in the [Google Cloud 
> Healthcare API|https://cloud.google.com/healthcare/docs/]
> HL7v2IO
> FHIRIO
> DICOM 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (BEAM-9468) Add Google Cloud Healthcare API IO Connectors

2020-03-08 Thread Jacob Ferriero (Jira)
Jacob Ferriero created BEAM-9468:


 Summary: Add Google Cloud Healthcare API IO Connectors
 Key: BEAM-9468
 URL: https://issues.apache.org/jira/browse/BEAM-9468
 Project: Beam
  Issue Type: New Feature
  Components: io-java-gcp
Reporter: Jacob Ferriero


Add IO Transforms for the HL7v2, FHIR and DICOM stores in the [Google Cloud 
Healthcare API|https://cloud.google.com/healthcare/docs/]

HL7v2IO

FHIRIO

DICOM 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)