[jira] [Work logged] (BEAM-9900) Remove the need for shutdownSourcesOnFinalWatermark flag
[ https://issues.apache.org/jira/browse/BEAM-9900?focusedWorklogId=435364&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435364 ] ASF GitHub Bot logged work on BEAM-9900: Author: ASF GitHub Bot Created on: 20/May/20 06:41 Start Date: 20/May/20 06:41 Worklog Time Spent: 10m Work Description: dmvk commented on pull request #11750: URL: https://github.com/apache/beam/pull/11750#issuecomment-631269828 Thanks for the fix. 👍 🎉 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435364) Time Spent: 1h 40m (was: 1.5h) > Remove the need for shutdownSourcesOnFinalWatermark flag > > > Key: BEAM-9900 > URL: https://issues.apache.org/jira/browse/BEAM-9900 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: P2 > Fix For: 2.22.0 > > Time Spent: 1h 40m > Remaining Estimate: 0h > > The {{shutdownSourcesOnFinalWatermark}} has caused some confusion in the > past. It is generally used for testing pipelines to ensure that the pipeline > and the testing cluster shuts down at the end of the job. Without it, the > pipeline will run forever in streaming mode, regardless of whether the input > is finite or not. > We didn't want to enable the flag by default because shutting down any > operators including sources in Flink will prevent checkpointing from working. > If we have side input, for example, that may be the case even for > long-running pipelines. However, we can detect whether checkpointing is > enabled and set the flag automatically. > The only situation where we may want the flag to be disabled is when users do > not have checkpointing enabled but want to take a savepoint. This should be > rare and users can mitigate by setting the flag to false to prevent operators > from shutting down. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9900) Remove the need for shutdownSourcesOnFinalWatermark flag
[ https://issues.apache.org/jira/browse/BEAM-9900?focusedWorklogId=435363&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435363 ] ASF GitHub Bot logged work on BEAM-9900: Author: ASF GitHub Bot Created on: 20/May/20 06:40 Start Date: 20/May/20 06:40 Worklog Time Spent: 10m Work Description: dmvk merged pull request #11750: URL: https://github.com/apache/beam/pull/11750 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435363) Time Spent: 1.5h (was: 1h 20m) > Remove the need for shutdownSourcesOnFinalWatermark flag > > > Key: BEAM-9900 > URL: https://issues.apache.org/jira/browse/BEAM-9900 > Project: Beam > Issue Type: Improvement > Components: runner-flink >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: P2 > Fix For: 2.22.0 > > Time Spent: 1.5h > Remaining Estimate: 0h > > The {{shutdownSourcesOnFinalWatermark}} has caused some confusion in the > past. It is generally used for testing pipelines to ensure that the pipeline > and the testing cluster shuts down at the end of the job. Without it, the > pipeline will run forever in streaming mode, regardless of whether the input > is finite or not. > We didn't want to enable the flag by default because shutting down any > operators including sources in Flink will prevent checkpointing from working. > If we have side input, for example, that may be the case even for > long-running pipelines. However, we can detect whether checkpointing is > enabled and set the flag automatically. > The only situation where we may want the flag to be disabled is when users do > not have checkpointing enabled but want to take a savepoint. This should be > rare and users can mitigate by setting the flag to false to prevent operators > from shutting down. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9468) Add Google Cloud Healthcare API IO Connectors
[ https://issues.apache.org/jira/browse/BEAM-9468?focusedWorklogId=435362&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435362 ] ASF GitHub Bot logged work on BEAM-9468: Author: ASF GitHub Bot Created on: 20/May/20 06:31 Start Date: 20/May/20 06:31 Worklog Time Spent: 10m Work Description: jaketf edited a comment on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-631265943 I think that something about `TestPubsubSignal` does not play well with the way this integration test suite runs on Dataflow. Following in suit with `PubsubReadIT` (which also uses `TestPububSignal`) `BigQueryIOReadIT` and `BigQueryIOStorageReadTableRowIT` I've added this to the exclude of this project's build file. This test can still be run (and passes) using DirectRunner and the evidence in the [above comment](https://github.com/apache/beam/pull/11339#issuecomment-631203002) points to this being a red herring of an issue with test pubsub signal and dataflow runner. Potentially related issue: [BEAM-6804](https://issues.apache.org/jira/browse/BEAM-6804?jql=text%20~%20%22TestPubsubSignal%22) I've filed an issue for this instance [BEAM-10040](https://issues.apache.org/jira/browse/BEAM-10040). @pabloem please let me know if this is acceptable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435362) Time Spent: 48h 20m (was: 48h 10m) > Add Google Cloud Healthcare API IO Connectors > - > > Key: BEAM-9468 > URL: https://issues.apache.org/jira/browse/BEAM-9468 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: P3 > Time Spent: 48h 20m > Remaining Estimate: 0h > > Add IO Transforms for the HL7v2, FHIR and DICOM stores in the [Google Cloud > Healthcare API|https://cloud.google.com/healthcare/docs/] > HL7v2IO > FHIRIO > DICOM -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10040) TestPubsubSignal not signalling success w/ Dataflow Runner
[ https://issues.apache.org/jira/browse/BEAM-10040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacob Ferriero updated BEAM-10040: -- Issue Type: Bug (was: Improvement) > TestPubsubSignal not signalling success w/ Dataflow Runner > -- > > Key: BEAM-10040 > URL: https://issues.apache.org/jira/browse/BEAM-10040 > Project: Beam > Issue Type: Bug > Components: test-failures >Reporter: Jacob Ferriero >Priority: P2 > > The issue with FhirIOReadIT seems to be some misuse of TestPubsubSignal > [Example > Job|https://pantheon.corp.google.com/dataflow/jobs/us-central1/2020-05-19_19_05_19-1057064265214622054?project=apache-beam-testing] > clearly has the expected >2000 elements added to the "waitForAnyMessage" task > but the success signal never gets published to the results topic. > Notably there are job level warnings about metric descriptors and [warnings > in shuffle > logs|https://pantheon.corp.google.com/logs/viewer?dateRangeEnd=2020-05-20T02:18:25.642Z&dateRangeStart=2020-05-20T02:05:21.347Z&expandAll=false&interval=CUSTOM&project=apache-beam-testing&resource=dataflow_step%2Fjob_id%2F2020-05-19_19_05_19-1057064265214622054×tamp=2020-05-20T02:57:46.18700Z&logName=projects%2Fapache-beam-testing%2Flogs%2Fdataflow.googleapis.com%252Fshuffler&minLogLevel=0&customFacets=&limitCustomFacetWidth=true&scrollTimestamp=2020-05-20T02:15:27.999165390Z] > which warns: > "Update range task returned 'invalid argument'. Assuming lost lease for work > with id 5061980071068333770 (expiration time: 1589940982000, now: > 1589940923590, full status: INVALID_ARGUMENT: Http(400) Bad Request). For > more information, see > https://cloud.google.com/dataflow/docs/guides/common-errors."; -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-10040) TestPubsubSignal not signalling success w/ Dataflow Runner
Jacob Ferriero created BEAM-10040: - Summary: TestPubsubSignal not signalling success w/ Dataflow Runner Key: BEAM-10040 URL: https://issues.apache.org/jira/browse/BEAM-10040 Project: Beam Issue Type: Improvement Components: test-failures Reporter: Jacob Ferriero The issue with FhirIOReadIT seems to be some misuse of TestPubsubSignal [Example Job|https://pantheon.corp.google.com/dataflow/jobs/us-central1/2020-05-19_19_05_19-1057064265214622054?project=apache-beam-testing] clearly has the expected >2000 elements added to the "waitForAnyMessage" task but the success signal never gets published to the results topic. Notably there are job level warnings about metric descriptors and [warnings in shuffle logs|https://pantheon.corp.google.com/logs/viewer?dateRangeEnd=2020-05-20T02:18:25.642Z&dateRangeStart=2020-05-20T02:05:21.347Z&expandAll=false&interval=CUSTOM&project=apache-beam-testing&resource=dataflow_step%2Fjob_id%2F2020-05-19_19_05_19-1057064265214622054×tamp=2020-05-20T02:57:46.18700Z&logName=projects%2Fapache-beam-testing%2Flogs%2Fdataflow.googleapis.com%252Fshuffler&minLogLevel=0&customFacets=&limitCustomFacetWidth=true&scrollTimestamp=2020-05-20T02:15:27.999165390Z] which warns: "Update range task returned 'invalid argument'. Assuming lost lease for work with id 5061980071068333770 (expiration time: 1589940982000, now: 1589940923590, full status: INVALID_ARGUMENT: Http(400) Bad Request). For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors."; -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9468) Add Google Cloud Healthcare API IO Connectors
[ https://issues.apache.org/jira/browse/BEAM-9468?focusedWorklogId=435361&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435361 ] ASF GitHub Bot logged work on BEAM-9468: Author: ASF GitHub Bot Created on: 20/May/20 06:30 Start Date: 20/May/20 06:30 Worklog Time Spent: 10m Work Description: jaketf commented on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-631265943 I think that something about `TestPubsubSignal` does not play well with the way this integration test suite runs on Dataflow. Following in suit with `PubsubReadIT` (which also uses `TestPububSignal`) `BigQueryIOReadIT` and `BigQueryIOStorageReadTableRowIT` I've added this to the exclude of this project's build file. This test can still be run (and passes) using DirectRunner and the evidence in the [above comment](https://github.com/apache/beam/pull/11339#issuecomment-631203002) points to this being a red herring of an issue with test pubsub signal and dataflow runner. Potentially related issue: [BEAM-6804](https://issues.apache.org/jira/browse/BEAM-6804?jql=text%20~%20%22TestPubsubSignal%22) I've filed an issue for this instance. @pabloem please let me know if this is acceptable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435361) Time Spent: 48h 10m (was: 48h) > Add Google Cloud Healthcare API IO Connectors > - > > Key: BEAM-9468 > URL: https://issues.apache.org/jira/browse/BEAM-9468 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: P3 > Time Spent: 48h 10m > Remaining Estimate: 0h > > Add IO Transforms for the HL7v2, FHIR and DICOM stores in the [Google Cloud > Healthcare API|https://cloud.google.com/healthcare/docs/] > HL7v2IO > FHIRIO > DICOM -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9679) Core Transforms | Go SDK Code Katas
[ https://issues.apache.org/jira/browse/BEAM-9679?focusedWorklogId=435360&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435360 ] ASF GitHub Bot logged work on BEAM-9679: Author: ASF GitHub Bot Created on: 20/May/20 06:18 Start Date: 20/May/20 06:18 Worklog Time Spent: 10m Work Description: damondouglas commented on pull request #11734: URL: https://github.com/apache/beam/pull/11734#issuecomment-631261638 @lostluck and @henryken, I've updated the [stepik](https://stepik.org/course/70387) course and committed the `*-remote.yaml` files to this PR. Thank you again for all your help and guidance. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435360) Time Spent: 1h 40m (was: 1.5h) > Core Transforms | Go SDK Code Katas > --- > > Key: BEAM-9679 > URL: https://issues.apache.org/jira/browse/BEAM-9679 > Project: Beam > Issue Type: Sub-task > Components: katas, sdk-go >Reporter: Damon Douglas >Assignee: Damon Douglas >Priority: P2 > Time Spent: 1h 40m > Remaining Estimate: 0h > > A kata devoted to core beam transforms patterns after > [https://github.com/apache/beam/tree/master/learning/katas/java/Core%20Transforms] > where the take away is an individual's ability to master the following using > an Apache Beam pipeline using the Golang SDK. > * Branching > * > [CoGroupByKey|[https://github.com/damondouglas/beam/tree/BEAM-9679-core-transform-groupbykey]] > * Combine > * Composite Transform > * DoFn Additional Parameters > * Flatten > * GroupByKey > * [Map|[https://github.com/apache/beam/pull/11564]] > * Partition > * Side Input -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10030) Add CSVIO for Java SDK
[ https://issues.apache.org/jira/browse/BEAM-10030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Saurabh Joshi updated BEAM-10030: - Component/s: (was: sdk-java-core) io-java-files > Add CSVIO for Java SDK > -- > > Key: BEAM-10030 > URL: https://issues.apache.org/jira/browse/BEAM-10030 > Project: Beam > Issue Type: New Feature > Components: io-ideas, io-java-files >Reporter: Saurabh Joshi >Priority: P2 > Attachments: CSVSource.java > > > Apache Beam has TextIO class which can read text based files line by line, > delimited by either a carriage return, newline, or a carriage return and a > newline. This approach does not support CSV files which have records that > span multiple lines. This is because there could be fields where there is a > newline inside the double quotes. > This Stackoverflow question is relevant for a feature that should be added to > Apache Beam: > [https://stackoverflow.com/questions/51439189/how-to-read-large-csv-with-beam] > I can think of two libraries we could use for handling CSV files. The first > one is using Apache Commons CSV library. Here is some example code which can > use CSVRecord class for reading and writing CSV records: > {{{color:#172b4d}{{PipelineOptions options = > PipelineOptionsFactory.create();}} > {{Pipeline pipeline = Pipeline.create(options);}} > {{PCollection records = pipeline.apply("ReadCSV", > CSVIO.read().from("input.csv"));}} > records.apply("WriteCSV", CSVIO.write().to("output.csv"));{color}}} > Another library we could use is Jackson CSV, which allows users to specify > schemas for the columns: > [https://github.com/FasterXML/jackson-dataformats-text/tree/master/csv] > The crux of the problem is this: can we read and write large CSV files in > parallel, by splitting the records and distribute it to many workers? If so, > would it be good to have a feature where Apache Beam supports reading/writing > CSV files? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9468) Add Google Cloud Healthcare API IO Connectors
[ https://issues.apache.org/jira/browse/BEAM-9468?focusedWorklogId=435359&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435359 ] ASF GitHub Bot logged work on BEAM-9468: Author: ASF GitHub Bot Created on: 20/May/20 06:17 Start Date: 20/May/20 06:17 Worklog Time Spent: 10m Work Description: jaketf edited a comment on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-631203002 The issue with FhirIOReadIT seems to be some misuse of [TestPubsubSignal](https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.html) [Example Job](https://pantheon.corp.google.com/dataflow/jobs/us-central1/2020-05-19_19_05_19-1057064265214622054?project=apache-beam-testing) clearly has the expected >2000 elements added to the "waitForAnyMessage" task but the success signal never gets published to the results topic. Notably there are job level warnings about metric descriptors and [warnings in shuffle logs](https://pantheon.corp.google.com/logs/viewer?dateRangeEnd=2020-05-20T02:18:25.642Z&dateRangeStart=2020-05-20T02:05:21.347Z&expandAll=false&interval=CUSTOM&project=apache-beam-testing&resource=dataflow_step%2Fjob_id%2F2020-05-19_19_05_19-1057064265214622054×tamp=2020-05-20T02:57:46.18700Z&logName=projects%2Fapache-beam-testing%2Flogs%2Fdataflow.googleapis.com%252Fshuffler&minLogLevel=0&customFacets=&limitCustomFacetWidth=true&scrollTimestamp=2020-05-20T02:15:27.999165390Z) which warns: ``` "Update range task returned 'invalid argument'. Assuming lost lease for work with id 5061980071068333770 (expiration time: 1589940982000, now: 1589940923590, full status: INVALID_ARGUMENT: Http(400) Bad Request). For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors."; ``` the [docs](https://cloud.google.com/dataflow/docs/guides/common-errors#bad-request-shuffler-logs) say this can be ignored but smells suspicious here. This is orthogonal to the behavior being tested. Investigating other means of performing this test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435359) Time Spent: 48h (was: 47h 50m) > Add Google Cloud Healthcare API IO Connectors > - > > Key: BEAM-9468 > URL: https://issues.apache.org/jira/browse/BEAM-9468 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: P3 > Time Spent: 48h > Remaining Estimate: 0h > > Add IO Transforms for the HL7v2, FHIR and DICOM stores in the [Google Cloud > Healthcare API|https://cloud.google.com/healthcare/docs/] > HL7v2IO > FHIRIO > DICOM -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10030) Add CSVIO for Java SDK
[ https://issues.apache.org/jira/browse/BEAM-10030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Saurabh Joshi updated BEAM-10030: - Component/s: sdk-java-core > Add CSVIO for Java SDK > -- > > Key: BEAM-10030 > URL: https://issues.apache.org/jira/browse/BEAM-10030 > Project: Beam > Issue Type: New Feature > Components: io-ideas, sdk-java-core >Reporter: Saurabh Joshi >Priority: P2 > Attachments: CSVSource.java > > > Apache Beam has TextIO class which can read text based files line by line, > delimited by either a carriage return, newline, or a carriage return and a > newline. This approach does not support CSV files which have records that > span multiple lines. This is because there could be fields where there is a > newline inside the double quotes. > This Stackoverflow question is relevant for a feature that should be added to > Apache Beam: > [https://stackoverflow.com/questions/51439189/how-to-read-large-csv-with-beam] > I can think of two libraries we could use for handling CSV files. The first > one is using Apache Commons CSV library. Here is some example code which can > use CSVRecord class for reading and writing CSV records: > {{{color:#172b4d}{{PipelineOptions options = > PipelineOptionsFactory.create();}} > {{Pipeline pipeline = Pipeline.create(options);}} > {{PCollection records = pipeline.apply("ReadCSV", > CSVIO.read().from("input.csv"));}} > records.apply("WriteCSV", CSVIO.write().to("output.csv"));{color}}} > Another library we could use is Jackson CSV, which allows users to specify > schemas for the columns: > [https://github.com/FasterXML/jackson-dataformats-text/tree/master/csv] > The crux of the problem is this: can we read and write large CSV files in > parallel, by splitting the records and distribute it to many workers? If so, > would it be good to have a feature where Apache Beam supports reading/writing > CSV files? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-10030) Add CSVIO for Java SDK
[ https://issues.apache.org/jira/browse/BEAM-10030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17111805#comment-17111805 ] Saurabh Joshi commented on BEAM-10030: -- There is a Jira issue that asks for CSV support in Go: https://issues.apache.org/jira/browse/BEAM-6371 > Add CSVIO for Java SDK > -- > > Key: BEAM-10030 > URL: https://issues.apache.org/jira/browse/BEAM-10030 > Project: Beam > Issue Type: New Feature > Components: io-ideas >Reporter: Saurabh Joshi >Priority: P2 > Attachments: CSVSource.java > > > Apache Beam has TextIO class which can read text based files line by line, > delimited by either a carriage return, newline, or a carriage return and a > newline. This approach does not support CSV files which have records that > span multiple lines. This is because there could be fields where there is a > newline inside the double quotes. > This Stackoverflow question is relevant for a feature that should be added to > Apache Beam: > [https://stackoverflow.com/questions/51439189/how-to-read-large-csv-with-beam] > I can think of two libraries we could use for handling CSV files. The first > one is using Apache Commons CSV library. Here is some example code which can > use CSVRecord class for reading and writing CSV records: > {{{color:#172b4d}{{PipelineOptions options = > PipelineOptionsFactory.create();}} > {{Pipeline pipeline = Pipeline.create(options);}} > {{PCollection records = pipeline.apply("ReadCSV", > CSVIO.read().from("input.csv"));}} > records.apply("WriteCSV", CSVIO.write().to("output.csv"));{color}}} > Another library we could use is Jackson CSV, which allows users to specify > schemas for the columns: > [https://github.com/FasterXML/jackson-dataformats-text/tree/master/csv] > The crux of the problem is this: can we read and write large CSV files in > parallel, by splitting the records and distribute it to many workers? If so, > would it be good to have a feature where Apache Beam supports reading/writing > CSV files? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9603) Support Dynamic Timer in Java SDK over FnApi
[ https://issues.apache.org/jira/browse/BEAM-9603?focusedWorklogId=435357&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435357 ] ASF GitHub Bot logged work on BEAM-9603: Author: ASF GitHub Bot Created on: 20/May/20 06:00 Start Date: 20/May/20 06:00 Worklog Time Spent: 10m Work Description: y1chi commented on a change in pull request #11756: URL: https://github.com/apache/beam/pull/11756#discussion_r427759019 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) { } } - private static class FnApiTimerMap implements TimerMap { -FnApiTimerMap() {} + private class FnApiTimerMap implements TimerMap { +private final String timerFamilyId; +private final K userKey; +private final TimeDomain timeDomain; +private final Instant elementTimestampOrTimerHoldTimestamp; +private final Instant elementTimestampOrTimerFireTimestamp; +private final BoundedWindow boundedWindow; +private final PaneInfo paneInfo; + +FnApiTimerMap( +String timerFamilyId, +K userKey, +BoundedWindow boundedWindow, +Instant elementTimestampOrTimerHoldTimestamp, +Instant elementTimestampOrTimerFireTimestamp, +PaneInfo paneInfo) { + this.timerFamilyId = timerFamilyId; + this.userKey = userKey; + this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp; + this.elementTimestampOrTimerFireTimestamp = elementTimestampOrTimerFireTimestamp; + this.boundedWindow = boundedWindow; + this.paneInfo = paneInfo; + + TimerFamilyDeclaration timerFamilyDeclaration = + doFnSignature.timerFamilyDeclarations().get(timerFamilyId); + this.timeDomain = + DoFnSignatures.getTimerFamilySpecOrThrow(timerFamilyDeclaration, doFn).getTimeDomain(); +} @Override -public void set(String timerId, Instant absoluteTime) {} +public void set(String dynamicTimerTag, Instant absoluteTime) { Review comment: probably makes sense to keep the set shortcut since it is the most frequently used one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435357) Time Spent: 3h 40m (was: 3.5h) > Support Dynamic Timer in Java SDK over FnApi > > > Key: BEAM-9603 > URL: https://issues.apache.org/jira/browse/BEAM-9603 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Boyuan Zhang >Assignee: Yichi Zhang >Priority: P2 > Time Spent: 3h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9603) Support Dynamic Timer in Java SDK over FnApi
[ https://issues.apache.org/jira/browse/BEAM-9603?focusedWorklogId=435356&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435356 ] ASF GitHub Bot logged work on BEAM-9603: Author: ASF GitHub Bot Created on: 20/May/20 05:58 Start Date: 20/May/20 05:58 Worklog Time Spent: 10m Work Description: y1chi commented on a change in pull request #11756: URL: https://github.com/apache/beam/pull/11756#discussion_r427758607 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) { } } - private static class FnApiTimerMap implements TimerMap { -FnApiTimerMap() {} + private class FnApiTimerMap implements TimerMap { +private final String timerFamilyId; +private final K userKey; +private final TimeDomain timeDomain; +private final Instant elementTimestampOrTimerHoldTimestamp; +private final Instant elementTimestampOrTimerFireTimestamp; +private final BoundedWindow boundedWindow; +private final PaneInfo paneInfo; + +FnApiTimerMap( +String timerFamilyId, +K userKey, +BoundedWindow boundedWindow, +Instant elementTimestampOrTimerHoldTimestamp, +Instant elementTimestampOrTimerFireTimestamp, +PaneInfo paneInfo) { + this.timerFamilyId = timerFamilyId; + this.userKey = userKey; + this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp; + this.elementTimestampOrTimerFireTimestamp = elementTimestampOrTimerFireTimestamp; + this.boundedWindow = boundedWindow; + this.paneInfo = paneInfo; + + TimerFamilyDeclaration timerFamilyDeclaration = + doFnSignature.timerFamilyDeclarations().get(timerFamilyId); + this.timeDomain = + DoFnSignatures.getTimerFamilySpecOrThrow(timerFamilyDeclaration, doFn).getTimeDomain(); +} @Override -public void set(String timerId, Instant absoluteTime) {} +public void set(String dynamicTimerTag, Instant absoluteTime) { Review comment: This is the interface required by TimerMap though: https://github.com/apache/beam/blob/9108832cf1cb57161997e16190dbc6eccdc10492/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerMap.java#L25 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435356) Time Spent: 3.5h (was: 3h 20m) > Support Dynamic Timer in Java SDK over FnApi > > > Key: BEAM-9603 > URL: https://issues.apache.org/jira/browse/BEAM-9603 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Boyuan Zhang >Assignee: Yichi Zhang >Priority: P2 > Time Spent: 3.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8889) Make GcsUtil use GoogleCloudStorage
[ https://issues.apache.org/jira/browse/BEAM-8889?focusedWorklogId=435355&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435355 ] ASF GitHub Bot logged work on BEAM-8889: Author: ASF GitHub Bot Created on: 20/May/20 05:56 Start Date: 20/May/20 05:56 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #11651: URL: https://github.com/apache/beam/pull/11651#issuecomment-631254015 Have you run the linkage checker ? https://cwiki.apache.org/confluence/display/BEAM/Dependency+Upgrades This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435355) Remaining Estimate: 134h (was: 134h 10m) Time Spent: 34h (was: 33h 50m) > Make GcsUtil use GoogleCloudStorage > --- > > Key: BEAM-8889 > URL: https://issues.apache.org/jira/browse/BEAM-8889 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Affects Versions: 2.16.0 >Reporter: Esun Kim >Assignee: VASU NORI >Priority: P2 > Labels: gcs > Original Estimate: 168h > Time Spent: 34h > Remaining Estimate: 134h > > [GcsUtil|https://github.com/apache/beam/blob/master/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java] > is a primary class to access Google Cloud Storage on Apache Beam. Current > implementation directly creates GoogleCloudStorageReadChannel and > GoogleCloudStorageWriteChannel by itself to read and write GCS data rather > than using > [GoogleCloudStorage|https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorage.java] > which is an abstract class providing basic IO capability which eventually > creates channel objects. This request is about updating GcsUtil to use > GoogleCloudStorage to create read and write channel, which is expected > flexible because it can easily pick up the new change; e.g. new channel > implementation using new protocol without code change. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8889) Make GcsUtil use GoogleCloudStorage
[ https://issues.apache.org/jira/browse/BEAM-8889?focusedWorklogId=435354&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435354 ] ASF GitHub Bot logged work on BEAM-8889: Author: ASF GitHub Bot Created on: 20/May/20 05:52 Start Date: 20/May/20 05:52 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #11651: URL: https://github.com/apache/beam/pull/11651#issuecomment-631252696 Run Python2_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435354) Remaining Estimate: 134h 10m (was: 134h 20m) Time Spent: 33h 50m (was: 33h 40m) > Make GcsUtil use GoogleCloudStorage > --- > > Key: BEAM-8889 > URL: https://issues.apache.org/jira/browse/BEAM-8889 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Affects Versions: 2.16.0 >Reporter: Esun Kim >Assignee: VASU NORI >Priority: P2 > Labels: gcs > Original Estimate: 168h > Time Spent: 33h 50m > Remaining Estimate: 134h 10m > > [GcsUtil|https://github.com/apache/beam/blob/master/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java] > is a primary class to access Google Cloud Storage on Apache Beam. Current > implementation directly creates GoogleCloudStorageReadChannel and > GoogleCloudStorageWriteChannel by itself to read and write GCS data rather > than using > [GoogleCloudStorage|https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorage.java] > which is an abstract class providing basic IO capability which eventually > creates channel objects. This request is about updating GcsUtil to use > GoogleCloudStorage to create read and write channel, which is expected > flexible because it can easily pick up the new change; e.g. new channel > implementation using new protocol without code change. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8889) Make GcsUtil use GoogleCloudStorage
[ https://issues.apache.org/jira/browse/BEAM-8889?focusedWorklogId=435353&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435353 ] ASF GitHub Bot logged work on BEAM-8889: Author: ASF GitHub Bot Created on: 20/May/20 05:52 Start Date: 20/May/20 05:52 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #11651: URL: https://github.com/apache/beam/pull/11651#issuecomment-631252638 Run Python2_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435353) Remaining Estimate: 134h 20m (was: 134.5h) Time Spent: 33h 40m (was: 33.5h) > Make GcsUtil use GoogleCloudStorage > --- > > Key: BEAM-8889 > URL: https://issues.apache.org/jira/browse/BEAM-8889 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Affects Versions: 2.16.0 >Reporter: Esun Kim >Assignee: VASU NORI >Priority: P2 > Labels: gcs > Original Estimate: 168h > Time Spent: 33h 40m > Remaining Estimate: 134h 20m > > [GcsUtil|https://github.com/apache/beam/blob/master/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java] > is a primary class to access Google Cloud Storage on Apache Beam. Current > implementation directly creates GoogleCloudStorageReadChannel and > GoogleCloudStorageWriteChannel by itself to read and write GCS data rather > than using > [GoogleCloudStorage|https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorage.java] > which is an abstract class providing basic IO capability which eventually > creates channel objects. This request is about updating GcsUtil to use > GoogleCloudStorage to create read and write channel, which is expected > flexible because it can easily pick up the new change; e.g. new channel > implementation using new protocol without code change. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=435351&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435351 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 20/May/20 05:48 Start Date: 20/May/20 05:48 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #11757: URL: https://github.com/apache/beam/pull/11757#issuecomment-631251258 R: @robertwb @ihji This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435351) Time Spent: 20h 10m (was: 20h) > Support cross-language transforms for DataflowRunner > > > Key: BEAM-8019 > URL: https://issues.apache.org/jira/browse/BEAM-8019 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Chamikara Madhusanka Jayalath >Priority: P1 > Time Spent: 20h 10m > Remaining Estimate: 0h > > This is to capture the Beam changes needed for this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=435352&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435352 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 20/May/20 05:48 Start Date: 20/May/20 05:48 Worklog Time Spent: 10m Work Description: chamikaramj commented on a change in pull request #11740: URL: https://github.com/apache/beam/pull/11740#discussion_r427755500 ## File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py ## @@ -310,15 +312,15 @@ def __init__( environment_payload = proto_utils.parse_Bytes( environment.payload, beam_runner_api_pb2.DockerPayload) container_image_url = environment_payload.container_image -if container_image_url == pipeline_sdk_container_image: - # This was already added +if container_image_url in already_added_containers: + # Do not add the pipeline environment again. Review comment: Sent https://github.com/apache/beam/pull/11757. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435352) Time Spent: 20h 20m (was: 20h 10m) > Support cross-language transforms for DataflowRunner > > > Key: BEAM-8019 > URL: https://issues.apache.org/jira/browse/BEAM-8019 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Chamikara Madhusanka Jayalath >Priority: P1 > Time Spent: 20h 20m > Remaining Estimate: 0h > > This is to capture the Beam changes needed for this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=435350&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435350 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 20/May/20 05:47 Start Date: 20/May/20 05:47 Worklog Time Spent: 10m Work Description: chamikaramj opened a new pull request #11757: URL: https://github.com/apache/beam/pull/11757 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://bu
[jira] [Work logged] (BEAM-9603) Support Dynamic Timer in Java SDK over FnApi
[ https://issues.apache.org/jira/browse/BEAM-9603?focusedWorklogId=435348&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435348 ] ASF GitHub Bot logged work on BEAM-9603: Author: ASF GitHub Bot Created on: 20/May/20 05:31 Start Date: 20/May/20 05:31 Worklog Time Spent: 10m Work Description: boyuanzz commented on a change in pull request #11756: URL: https://github.com/apache/beam/pull/11756#discussion_r427750709 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) { } } - private static class FnApiTimerMap implements TimerMap { -FnApiTimerMap() {} + private class FnApiTimerMap implements TimerMap { +private final String timerFamilyId; +private final K userKey; +private final TimeDomain timeDomain; +private final Instant elementTimestampOrTimerHoldTimestamp; +private final Instant elementTimestampOrTimerFireTimestamp; +private final BoundedWindow boundedWindow; +private final PaneInfo paneInfo; + +FnApiTimerMap( +String timerFamilyId, +K userKey, +BoundedWindow boundedWindow, +Instant elementTimestampOrTimerHoldTimestamp, +Instant elementTimestampOrTimerFireTimestamp, +PaneInfo paneInfo) { + this.timerFamilyId = timerFamilyId; + this.userKey = userKey; + this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp; + this.elementTimestampOrTimerFireTimestamp = elementTimestampOrTimerFireTimestamp; + this.boundedWindow = boundedWindow; + this.paneInfo = paneInfo; + + TimerFamilyDeclaration timerFamilyDeclaration = + doFnSignature.timerFamilyDeclarations().get(timerFamilyId); + this.timeDomain = + DoFnSignatures.getTimerFamilySpecOrThrow(timerFamilyDeclaration, doFn).getTimeDomain(); +} @Override -public void set(String timerId, Instant absoluteTime) {} +public void set(String dynamicTimerTag, Instant absoluteTime) { Review comment: Thanks for the clarification! I still think we should have a better API(and doc) here, like `getTimer(timerId)`? And I would prefer not exposing `set()` since `getTimer()` is a more recommended way. What do you think? You can also start a discussion thread in dev list since it's a user faced API. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435348) Time Spent: 3h 20m (was: 3h 10m) > Support Dynamic Timer in Java SDK over FnApi > > > Key: BEAM-9603 > URL: https://issues.apache.org/jira/browse/BEAM-9603 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Boyuan Zhang >Assignee: Yichi Zhang >Priority: P2 > Time Spent: 3h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9603) Support Dynamic Timer in Java SDK over FnApi
[ https://issues.apache.org/jira/browse/BEAM-9603?focusedWorklogId=435343&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435343 ] ASF GitHub Bot logged work on BEAM-9603: Author: ASF GitHub Bot Created on: 20/May/20 04:44 Start Date: 20/May/20 04:44 Worklog Time Spent: 10m Work Description: y1chi commented on a change in pull request #11756: URL: https://github.com/apache/beam/pull/11756#discussion_r427738041 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) { } } - private static class FnApiTimerMap implements TimerMap { -FnApiTimerMap() {} + private class FnApiTimerMap implements TimerMap { +private final String timerFamilyId; +private final K userKey; +private final TimeDomain timeDomain; +private final Instant elementTimestampOrTimerHoldTimestamp; +private final Instant elementTimestampOrTimerFireTimestamp; +private final BoundedWindow boundedWindow; +private final PaneInfo paneInfo; + +FnApiTimerMap( +String timerFamilyId, +K userKey, +BoundedWindow boundedWindow, +Instant elementTimestampOrTimerHoldTimestamp, +Instant elementTimestampOrTimerFireTimestamp, +PaneInfo paneInfo) { + this.timerFamilyId = timerFamilyId; + this.userKey = userKey; + this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp; + this.elementTimestampOrTimerFireTimestamp = elementTimestampOrTimerFireTimestamp; + this.boundedWindow = boundedWindow; + this.paneInfo = paneInfo; + + TimerFamilyDeclaration timerFamilyDeclaration = + doFnSignature.timerFamilyDeclarations().get(timerFamilyId); + this.timeDomain = + DoFnSignatures.getTimerFamilySpecOrThrow(timerFamilyDeclaration, doFn).getTimeDomain(); +} @Override -public void set(String timerId, Instant absoluteTime) {} +public void set(String dynamicTimerTag, Instant absoluteTime) { Review comment: I believe we can always call the get() first to access FnApiTimer and call it's APIs. Probably that's sufficient enough? I feel adding more shortcuts only makes the API slightly more user-friendly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435343) Time Spent: 3h 10m (was: 3h) > Support Dynamic Timer in Java SDK over FnApi > > > Key: BEAM-9603 > URL: https://issues.apache.org/jira/browse/BEAM-9603 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Boyuan Zhang >Assignee: Yichi Zhang >Priority: P2 > Time Spent: 3h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9603) Support Dynamic Timer in Java SDK over FnApi
[ https://issues.apache.org/jira/browse/BEAM-9603?focusedWorklogId=435342&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435342 ] ASF GitHub Bot logged work on BEAM-9603: Author: ASF GitHub Bot Created on: 20/May/20 04:41 Start Date: 20/May/20 04:41 Worklog Time Spent: 10m Work Description: y1chi commented on a change in pull request #11756: URL: https://github.com/apache/beam/pull/11756#discussion_r427737289 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -962,16 +971,25 @@ private Progress getProgress() { .build()); } - private void processTimer(String timerId, TimeDomain timeDomain, Timer timer) { + private void processTimer( + String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer timer) { currentTimer = timer; currentTimeDomain = timeDomain; onTimerContext = new OnTimerContext<>(timer.getUserKey()); +String timerId = +timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX) Review comment: it will be ignored anyway, apparently only one of timerId or the timerFamilyId takes effect. https://github.com/apache/beam/blob/591de3473144de54beef0932131025e2a4d8504b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java#L223 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435342) Time Spent: 3h (was: 2h 50m) > Support Dynamic Timer in Java SDK over FnApi > > > Key: BEAM-9603 > URL: https://issues.apache.org/jira/browse/BEAM-9603 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Boyuan Zhang >Assignee: Yichi Zhang >Priority: P2 > Time Spent: 3h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9723) [Java] PTransform that connects to Cloud DLP deidentification service
[ https://issues.apache.org/jira/browse/BEAM-9723?focusedWorklogId=435336&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435336 ] ASF GitHub Bot logged work on BEAM-9723: Author: ASF GitHub Bot Created on: 20/May/20 04:15 Start Date: 20/May/20 04:15 Worklog Time Spent: 10m Work Description: tysonjh commented on a change in pull request #11566: URL: https://github.com/apache/beam/pull/11566#discussion_r427471127 ## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.privacy.dlp.v2.Table; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * DoFn batching the input PCollection into bigger requests in order to better utilize the Cloud DLP Review comment: It's good practice to start all javadoc comments with a short summary fragment. There are more details at Google's java style guide [1]. For example, I would phrase the summary fragment for this class as: 'Batches input rows to reduce the number of requests sent to the Cloud DLP service.' Would you please go through this CL and add such comments to public classes and methods? I personally like to add them to all classes, non-trivial methods, and tricky blocks of code, regardless of access modifiers. [1] https://google.github.io/styleguide/javaguide.html#s7.2-summary-fragment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435336) Time Spent: 3h 40m (was: 3.5h) > [Java] PTransform that connects to Cloud DLP deidentification service > - > > Key: BEAM-9723 > URL: https://issues.apache.org/jira/browse/BEAM-9723 > Project: Beam > Issue Type: Sub-task > Components: io-java-gcp >Reporter: Michał Walenia >Assignee: Michał Walenia >Priority: P2 > Time Spent: 3h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
[ https://issues.apache.org/jira/browse/BEAM-9856?focusedWorklogId=435335&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435335 ] ASF GitHub Bot logged work on BEAM-9856: Author: ASF GitHub Bot Created on: 20/May/20 04:08 Start Date: 20/May/20 04:08 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #11596: URL: https://github.com/apache/beam/pull/11596#issuecomment-631224035 Run Java PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435335) Time Spent: 8h 40m (was: 8.5h) > HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization > -- > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: P3 > Time Spent: 8h 40m > Remaining Estimate: 0h > > Currently the List Messages API paginates through in a single ProcessElement > Call. > However we could get a restriction based on createTime using Messages.List > filter and orderby. > > This is inline with the future roadmap of HL7v2 bulk export API becomes > available that should allow splitting on (e.g. create time dimension). > Leveraging this bulk export might be a future optimization to explore. > > This could take one of two forms: > 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make > optimization the runner's problem, potentially unnecessarily complex for this > use case ) > 2. static splitting on some time partition e.g. finding the earliest > createTime and emitting a PCollection of 1 hour partitions and paginating > through each hour of data w/ in the time frame that the store spans, in a > separate ProcessElement. (easy to implement but will likely have hot keys / > stragglers based on "busy hours") > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9603) Support Dynamic Timer in Java SDK over FnApi
[ https://issues.apache.org/jira/browse/BEAM-9603?focusedWorklogId=435333&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435333 ] ASF GitHub Bot logged work on BEAM-9603: Author: ASF GitHub Bot Created on: 20/May/20 04:04 Start Date: 20/May/20 04:04 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11756: URL: https://github.com/apache/beam/pull/11756#issuecomment-631223098 Run Java Flink PortableValidatesRunner Streaming This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435333) Time Spent: 2h 50m (was: 2h 40m) > Support Dynamic Timer in Java SDK over FnApi > > > Key: BEAM-9603 > URL: https://issues.apache.org/jira/browse/BEAM-9603 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Boyuan Zhang >Assignee: Yichi Zhang >Priority: P2 > Time Spent: 2h 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9603) Support Dynamic Timer in Java SDK over FnApi
[ https://issues.apache.org/jira/browse/BEAM-9603?focusedWorklogId=435332&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435332 ] ASF GitHub Bot logged work on BEAM-9603: Author: ASF GitHub Bot Created on: 20/May/20 04:04 Start Date: 20/May/20 04:04 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11756: URL: https://github.com/apache/beam/pull/11756#issuecomment-631223003 Run Java Flink PortableValidatesRunner Batch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435332) Time Spent: 2h 40m (was: 2.5h) > Support Dynamic Timer in Java SDK over FnApi > > > Key: BEAM-9603 > URL: https://issues.apache.org/jira/browse/BEAM-9603 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Boyuan Zhang >Assignee: Yichi Zhang >Priority: P2 > Time Spent: 2h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-9603) Support Dynamic Timer in Java SDK over FnApi
[ https://issues.apache.org/jira/browse/BEAM-9603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyuan Zhang reassigned BEAM-9603: -- Assignee: Yichi Zhang > Support Dynamic Timer in Java SDK over FnApi > > > Key: BEAM-9603 > URL: https://issues.apache.org/jira/browse/BEAM-9603 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Boyuan Zhang >Assignee: Yichi Zhang >Priority: P2 > Time Spent: 2.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9603) Support Dynamic Timer in Java SDK over FnApi
[ https://issues.apache.org/jira/browse/BEAM-9603?focusedWorklogId=435331&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435331 ] ASF GitHub Bot logged work on BEAM-9603: Author: ASF GitHub Bot Created on: 20/May/20 03:52 Start Date: 20/May/20 03:52 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11756: URL: https://github.com/apache/beam/pull/11756#issuecomment-631220097 Run Java Flink PortableValidatesRunner Batch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435331) Time Spent: 2.5h (was: 2h 20m) > Support Dynamic Timer in Java SDK over FnApi > > > Key: BEAM-9603 > URL: https://issues.apache.org/jira/browse/BEAM-9603 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Boyuan Zhang >Priority: P2 > Time Spent: 2.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9603) Support Dynamic Timer in Java SDK over FnApi
[ https://issues.apache.org/jira/browse/BEAM-9603?focusedWorklogId=435329&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435329 ] ASF GitHub Bot logged work on BEAM-9603: Author: ASF GitHub Bot Created on: 20/May/20 03:51 Start Date: 20/May/20 03:51 Worklog Time Spent: 10m Work Description: boyuanzz closed pull request #11756: URL: https://github.com/apache/beam/pull/11756 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435329) Time Spent: 2h 10m (was: 2h) > Support Dynamic Timer in Java SDK over FnApi > > > Key: BEAM-9603 > URL: https://issues.apache.org/jira/browse/BEAM-9603 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Boyuan Zhang >Priority: P2 > Time Spent: 2h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9603) Support Dynamic Timer in Java SDK over FnApi
[ https://issues.apache.org/jira/browse/BEAM-9603?focusedWorklogId=435330&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435330 ] ASF GitHub Bot logged work on BEAM-9603: Author: ASF GitHub Bot Created on: 20/May/20 03:51 Start Date: 20/May/20 03:51 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11756: URL: https://github.com/apache/beam/pull/11756#issuecomment-631219846 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435330) Time Spent: 2h 20m (was: 2h 10m) > Support Dynamic Timer in Java SDK over FnApi > > > Key: BEAM-9603 > URL: https://issues.apache.org/jira/browse/BEAM-9603 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Boyuan Zhang >Priority: P2 > Time Spent: 2h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9603) Support Dynamic Timer in Java SDK over FnApi
[ https://issues.apache.org/jira/browse/BEAM-9603?focusedWorklogId=435328&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435328 ] ASF GitHub Bot logged work on BEAM-9603: Author: ASF GitHub Bot Created on: 20/May/20 03:51 Start Date: 20/May/20 03:51 Worklog Time Spent: 10m Work Description: y1chi opened a new pull request #11756: URL: https://github.com/apache/beam/pull/11756 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.a
[jira] [Work logged] (BEAM-9603) Support Dynamic Timer in Java SDK over FnApi
[ https://issues.apache.org/jira/browse/BEAM-9603?focusedWorklogId=435327&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435327 ] ASF GitHub Bot logged work on BEAM-9603: Author: ASF GitHub Bot Created on: 20/May/20 03:50 Start Date: 20/May/20 03:50 Worklog Time Spent: 10m Work Description: boyuanzz commented on a change in pull request #11756: URL: https://github.com/apache/beam/pull/11756#discussion_r427720451 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -962,16 +971,25 @@ private Progress getProgress() { .build()); } - private void processTimer(String timerId, TimeDomain timeDomain, Timer timer) { + private void processTimer( + String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer timer) { currentTimer = timer; currentTimeDomain = timeDomain; onTimerContext = new OnTimerContext<>(timer.getUserKey()); +String timerId = +timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX) Review comment: If the `timerIdOrTimerFamilyId ` is for a timer family, should the timerId be the `timer.dynamicTimerTag`? ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) { } } - private static class FnApiTimerMap implements TimerMap { -FnApiTimerMap() {} + private class FnApiTimerMap implements TimerMap { +private final String timerFamilyId; +private final K userKey; +private final TimeDomain timeDomain; +private final Instant elementTimestampOrTimerHoldTimestamp; +private final Instant elementTimestampOrTimerFireTimestamp; +private final BoundedWindow boundedWindow; +private final PaneInfo paneInfo; + +FnApiTimerMap( +String timerFamilyId, +K userKey, +BoundedWindow boundedWindow, +Instant elementTimestampOrTimerHoldTimestamp, +Instant elementTimestampOrTimerFireTimestamp, +PaneInfo paneInfo) { + this.timerFamilyId = timerFamilyId; + this.userKey = userKey; + this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp; + this.elementTimestampOrTimerFireTimestamp = elementTimestampOrTimerFireTimestamp; + this.boundedWindow = boundedWindow; + this.paneInfo = paneInfo; + + TimerFamilyDeclaration timerFamilyDeclaration = + doFnSignature.timerFamilyDeclarations().get(timerFamilyId); + this.timeDomain = Review comment: Similar to `FnApiTimer` above, we should have `timeDomain` from proto, right? ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -460,14 +461,22 @@ public void accept(WindowedValue input) throws Exception { // Extract out relevant TimerFamilySpec information in preparation for execution. for (Map.Entry entry : parDoPayload.getTimerFamilySpecsMap().entrySet()) { -String timerFamilyId = entry.getKey(); -TimeDomain timeDomain = -DoFnSignatures.getTimerSpecOrThrow( -doFnSignature.timerDeclarations().get(timerFamilyId), doFn) -.getTimeDomain(); +String timerIdOrTimerFamilyId = entry.getKey(); +TimeDomain timeDomain; +if (timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)) { + timeDomain = Review comment: The `TTimerFamilySpec` should have `time_domain ` field. Maybe we could do something similar to https://github.com/apache/beam/blob/1de50c348706ed25af2bab9c9477d7d4f36ef8bf/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java#L657-L668 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -460,14 +461,22 @@ public void accept(WindowedValue input) throws Exception { // Extract out relevant TimerFamilySpec information in preparation for execution. for (Map.Entry entry : parDoPayload.getTimerFamilySpecsMap().entrySet()) { -String timerFamilyId = entry.getKey(); -TimeDomain timeDomain = -DoFnSignatures.getTimerSpecOrThrow( -doFnSignature.timerDeclarations().get(timerFamilyId), doFn) -.getTimeDomain(); +String timerIdOrTimerFamilyId = entry.getKey(); +TimeDomain timeDomain; +if (timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)) { + timeDomain = + DoFnSignatures.getTimerFamilySpecOrThrow( + doFnSignature.timerFamilyDeclarations().get(timerIdOrTimerFamilyId), doFn) + .getTimeDomain(); +} else { + timeDomain = +
[jira] [Updated] (BEAM-9887) Throw IllegalArgumentException when building Row with logical types with Invalid input
[ https://issues.apache.org/jira/browse/BEAM-9887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rahul Patwari updated BEAM-9887: Fix Version/s: (was: 2.21.0) 2.22.0 > Throw IllegalArgumentException when building Row with logical types with > Invalid input > --- > > Key: BEAM-9887 > URL: https://issues.apache.org/jira/browse/BEAM-9887 > Project: Beam > Issue Type: Bug > Components: sdk-java-core >Reporter: Rahul Patwari >Assignee: Rahul Patwari >Priority: P2 > Fix For: 2.22.0 > > Time Spent: 3h 50m > Remaining Estimate: 0h > > schema.logicaltypes.FixedBytes logical type expects an argument - the length > of the byte[]. > When an invalid input value (with length < expectedLength) is provided while > building the Row with FixedBytes logical type, IllegalArgumentException is > expected. But, the Exception is not thrown. The below code illustrates the > behaviour: > {code:java} > Schema schema = Schema.builder().addLogicalTypeField("char", > FixedBytes.of(10)).build(); > byte[] byteArray = {1, 2, 3, 4, 5}; > Row row = Row.withSchema(schema).withFieldValue("char", byteArray).build(); > System.out.println(Arrays.toString(row.getLogicalTypeValue("char", > byte[].class))); > {code} > The above code prints "[1, 2, 3, 4, 5]" with length 5 to the console, whereas > the expected length of FixedBytes, is 10. > The code is run on the master branch. > The behaviour is as expected with 2.20.0 release. > {{ }} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9679) Core Transforms | Go SDK Code Katas
[ https://issues.apache.org/jira/browse/BEAM-9679?focusedWorklogId=435326&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435326 ] ASF GitHub Bot logged work on BEAM-9679: Author: ASF GitHub Bot Created on: 20/May/20 03:38 Start Date: 20/May/20 03:38 Worklog Time Spent: 10m Work Description: henryken commented on pull request #11734: URL: https://github.com/apache/beam/pull/11734#issuecomment-631216779 @damondouglas, please help to update the Stepik course. Afterwards, we can merge this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435326) Time Spent: 1.5h (was: 1h 20m) > Core Transforms | Go SDK Code Katas > --- > > Key: BEAM-9679 > URL: https://issues.apache.org/jira/browse/BEAM-9679 > Project: Beam > Issue Type: Sub-task > Components: katas, sdk-go >Reporter: Damon Douglas >Assignee: Damon Douglas >Priority: P2 > Time Spent: 1.5h > Remaining Estimate: 0h > > A kata devoted to core beam transforms patterns after > [https://github.com/apache/beam/tree/master/learning/katas/java/Core%20Transforms] > where the take away is an individual's ability to master the following using > an Apache Beam pipeline using the Golang SDK. > * Branching > * > [CoGroupByKey|[https://github.com/damondouglas/beam/tree/BEAM-9679-core-transform-groupbykey]] > * Combine > * Composite Transform > * DoFn Additional Parameters > * Flatten > * GroupByKey > * [Map|[https://github.com/apache/beam/pull/11564]] > * Partition > * Side Input -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9468) Add Google Cloud Healthcare API IO Connectors
[ https://issues.apache.org/jira/browse/BEAM-9468?focusedWorklogId=435325&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435325 ] ASF GitHub Bot logged work on BEAM-9468: Author: ASF GitHub Bot Created on: 20/May/20 03:27 Start Date: 20/May/20 03:27 Worklog Time Spent: 10m Work Description: jaketf edited a comment on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-631203002 The issue with FhirIOReadIT seems to be some misuse of [TestPubsubSignal](https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.html) [Example Job](https://pantheon.corp.google.com/dataflow/jobs/us-central1/2020-05-19_19_05_19-1057064265214622054?project=apache-beam-testing) clearly has elements added to the "waitForAnyMessage" task but the success signal never gets published to the results topic. Notably there are job level warnings about metric descriptors and [warnings in shuffle logs](https://pantheon.corp.google.com/logs/viewer?dateRangeEnd=2020-05-20T02:18:25.642Z&dateRangeStart=2020-05-20T02:05:21.347Z&expandAll=false&interval=CUSTOM&project=apache-beam-testing&resource=dataflow_step%2Fjob_id%2F2020-05-19_19_05_19-1057064265214622054×tamp=2020-05-20T02:57:46.18700Z&logName=projects%2Fapache-beam-testing%2Flogs%2Fdataflow.googleapis.com%252Fshuffler&minLogLevel=0&customFacets=&limitCustomFacetWidth=true&scrollTimestamp=2020-05-20T02:15:27.999165390Z) which warns: ``` "Update range task returned 'invalid argument'. Assuming lost lease for work with id 5061980071068333770 (expiration time: 1589940982000, now: 1589940923590, full status: INVALID_ARGUMENT: Http(400) Bad Request). For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors."; ``` the [docs](https://cloud.google.com/dataflow/docs/guides/common-errors#bad-request-shuffler-logs) say this can be ignored but smells suspicious here. This is orthogonal to the behavior being tested. Investigating other means of performing this test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435325) Time Spent: 47h 50m (was: 47h 40m) > Add Google Cloud Healthcare API IO Connectors > - > > Key: BEAM-9468 > URL: https://issues.apache.org/jira/browse/BEAM-9468 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: P3 > Time Spent: 47h 50m > Remaining Estimate: 0h > > Add IO Transforms for the HL7v2, FHIR and DICOM stores in the [Google Cloud > Healthcare API|https://cloud.google.com/healthcare/docs/] > HL7v2IO > FHIRIO > DICOM -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9722) Add batch SnowflakeIO.Read to Java SDK
[ https://issues.apache.org/jira/browse/BEAM-9722?focusedWorklogId=435324&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435324 ] ASF GitHub Bot logged work on BEAM-9722: Author: ASF GitHub Bot Created on: 20/May/20 03:22 Start Date: 20/May/20 03:22 Worklog Time Spent: 10m Work Description: chamikaramj commented on a change in pull request #11360: URL: https://github.com/apache/beam/pull/11360#discussion_r427718773 ## File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java ## @@ -0,0 +1,735 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.snowflake; + +import static org.apache.beam.sdk.io.TextIO.readFiles; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import com.opencsv.CSVParser; +import com.opencsv.CSVParserBuilder; +import java.io.IOException; +import java.io.Serializable; +import java.security.PrivateKey; +import java.sql.Connection; +import java.sql.SQLException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import javax.sql.DataSource; +import net.snowflake.client.jdbc.SnowflakeBasicDataSource; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.io.snowflake.credentials.KeyPairSnowflakeCredentials; +import org.apache.beam.sdk.io.snowflake.credentials.OAuthTokenSnowflakeCredentials; +import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentials; +import org.apache.beam.sdk.io.snowflake.credentials.UsernamePasswordSnowflakeCredentials; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.Wait; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * IO to read and write data on Snowflake. + * + * SnowflakeIO uses https://docs.snowflake.net/manuals/user-guide/jdbc.html";>Snowflake + * JDBC driver under the hood, but data isn't read/written using JDBC directly. Instead, + * SnowflakeIO uses dedicated COPY operations to read/write data from/to a cloud bucket. By + * now only Google Cloud Storage is supported. + * + * To configure SnowflakeIO to read/write from your Snowflake instance, you have to provide a + * {@link DataSourceConfiguration} using {@link + * DataSourceConfiguration#create(SnowflakeCredentials)}, where {@link SnowflakeCredentials might be + * created using {@link org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory}}. + * Additionally one of {@link DataSourceConfiguration#withServerName(String)} or {@link + * DataSourceConfiguration#withUrl(String)} must be used to tell SnowflakeIO which instance to use. + * + * There are also other options available to configure connection to Snowflake: + * + * + * {@link DataSourceConfiguration#withWarehouse(String)} to specify which Warehouse to use + * {@link DataSourceConfiguration#withDatabase(String)} to specify which Database to connect + * to + * {@link DataSourceConfiguration#withSchema(String)} to specify which schema to use + * {@link DataSourceConfiguration#withRole(String)} to specify which role to use + * {@link DataSourceConfiguration#withLoginTimeout(Integer)} to specify the timeout for the + * login + * {@link DataSourceConfiguration#withPortNumber(Integer)} to specify custom port of Snowflake + *
[jira] [Work logged] (BEAM-9468) Add Google Cloud Healthcare API IO Connectors
[ https://issues.apache.org/jira/browse/BEAM-9468?focusedWorklogId=435323&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435323 ] ASF GitHub Bot logged work on BEAM-9468: Author: ASF GitHub Bot Created on: 20/May/20 03:02 Start Date: 20/May/20 03:02 Worklog Time Spent: 10m Work Description: jaketf edited a comment on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-631203002 The issue with FhirIOReadIT seems to be some misuse of [TestPubsubSignal](https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.html) [Example Job](https://pantheon.corp.google.com/dataflow/jobs/us-central1/2020-05-19_19_05_19-1057064265214622054?project=apache-beam-testing) clearly has elements added to the "waitForAnyMessage" task but the success signal never gets published to the results topic. Notably there are [warnings in shuffle logs](https://pantheon.corp.google.com/logs/viewer?dateRangeEnd=2020-05-20T02:18:25.642Z&dateRangeStart=2020-05-20T02:05:21.347Z&expandAll=false&interval=CUSTOM&project=apache-beam-testing&resource=dataflow_step%2Fjob_id%2F2020-05-19_19_05_19-1057064265214622054×tamp=2020-05-20T02:57:46.18700Z&logName=projects%2Fapache-beam-testing%2Flogs%2Fdataflow.googleapis.com%252Fshuffler&minLogLevel=0&customFacets=&limitCustomFacetWidth=true&scrollTimestamp=2020-05-20T02:15:27.999165390Z) which warns: ``` "Update range task returned 'invalid argument'. Assuming lost lease for work with id 5061980071068333770 (expiration time: 1589940982000, now: 1589940923590, full status: INVALID_ARGUMENT: Http(400) Bad Request). For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors."; ``` the [docs](https://cloud.google.com/dataflow/docs/guides/common-errors#bad-request-shuffler-logs) say this can be ignored but smells suspicious here. This is orthogonal to the behavior being tested. Investigating other means of performing this test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435323) Time Spent: 47h 40m (was: 47.5h) > Add Google Cloud Healthcare API IO Connectors > - > > Key: BEAM-9468 > URL: https://issues.apache.org/jira/browse/BEAM-9468 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: P3 > Time Spent: 47h 40m > Remaining Estimate: 0h > > Add IO Transforms for the HL7v2, FHIR and DICOM stores in the [Google Cloud > Healthcare API|https://cloud.google.com/healthcare/docs/] > HL7v2IO > FHIRIO > DICOM -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9977) Build Kafka Read on top of Java SplittableDoFn
[ https://issues.apache.org/jira/browse/BEAM-9977?focusedWorklogId=435319&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435319 ] ASF GitHub Bot logged work on BEAM-9977: Author: ASF GitHub Bot Created on: 20/May/20 02:55 Start Date: 20/May/20 02:55 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11715: URL: https://github.com/apache/beam/pull/11715#issuecomment-631204566 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435319) Time Spent: 3h (was: 2h 50m) > Build Kafka Read on top of Java SplittableDoFn > -- > > Key: BEAM-9977 > URL: https://issues.apache.org/jira/browse/BEAM-9977 > Project: Beam > Issue Type: New Feature > Components: io-java-kafka >Reporter: Boyuan Zhang >Assignee: Boyuan Zhang >Priority: P2 > Time Spent: 3h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9468) Add Google Cloud Healthcare API IO Connectors
[ https://issues.apache.org/jira/browse/BEAM-9468?focusedWorklogId=435316&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435316 ] ASF GitHub Bot logged work on BEAM-9468: Author: ASF GitHub Bot Created on: 20/May/20 02:50 Start Date: 20/May/20 02:50 Worklog Time Spent: 10m Work Description: jaketf commented on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-631203002 The issue with FhirIOReadIT seems to be some misuse of [TestPubsubSignal](https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.html) [Example Job](https://pantheon.corp.google.com/dataflow/jobs/us-central1/2020-05-19_19_05_19-1057064265214622054?project=apache-beam-testing) clearly has elements added to the "waitForAnyMessage" task but the success signal never gets published to the results topic. This is tangential to the behavior being tested. Investigating other means of performing this test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435316) Time Spent: 47h 20m (was: 47h 10m) > Add Google Cloud Healthcare API IO Connectors > - > > Key: BEAM-9468 > URL: https://issues.apache.org/jira/browse/BEAM-9468 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: P3 > Time Spent: 47h 20m > Remaining Estimate: 0h > > Add IO Transforms for the HL7v2, FHIR and DICOM stores in the [Google Cloud > Healthcare API|https://cloud.google.com/healthcare/docs/] > HL7v2IO > FHIRIO > DICOM -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9468) Add Google Cloud Healthcare API IO Connectors
[ https://issues.apache.org/jira/browse/BEAM-9468?focusedWorklogId=435317&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435317 ] ASF GitHub Bot logged work on BEAM-9468: Author: ASF GitHub Bot Created on: 20/May/20 02:50 Start Date: 20/May/20 02:50 Worklog Time Spent: 10m Work Description: jaketf edited a comment on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-631203002 The issue with FhirIOReadIT seems to be some misuse of [TestPubsubSignal](https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.html) [Example Job](https://pantheon.corp.google.com/dataflow/jobs/us-central1/2020-05-19_19_05_19-1057064265214622054?project=apache-beam-testing) clearly has elements added to the "waitForAnyMessage" task but the success signal never gets published to the results topic. This is orthogonal to the behavior being tested. Investigating other means of performing this test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435317) Time Spent: 47.5h (was: 47h 20m) > Add Google Cloud Healthcare API IO Connectors > - > > Key: BEAM-9468 > URL: https://issues.apache.org/jira/browse/BEAM-9468 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: P3 > Time Spent: 47.5h > Remaining Estimate: 0h > > Add IO Transforms for the HL7v2, FHIR and DICOM stores in the [Google Cloud > Healthcare API|https://cloud.google.com/healthcare/docs/] > HL7v2IO > FHIRIO > DICOM -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
[ https://issues.apache.org/jira/browse/BEAM-9856?focusedWorklogId=435311&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435311 ] ASF GitHub Bot logged work on BEAM-9856: Author: ASF GitHub Bot Created on: 20/May/20 02:05 Start Date: 20/May/20 02:05 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #11596: URL: https://github.com/apache/beam/pull/11596#issuecomment-631189470 Run Java PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435311) Time Spent: 8.5h (was: 8h 20m) > HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization > -- > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: P3 > Time Spent: 8.5h > Remaining Estimate: 0h > > Currently the List Messages API paginates through in a single ProcessElement > Call. > However we could get a restriction based on createTime using Messages.List > filter and orderby. > > This is inline with the future roadmap of HL7v2 bulk export API becomes > available that should allow splitting on (e.g. create time dimension). > Leveraging this bulk export might be a future optimization to explore. > > This could take one of two forms: > 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make > optimization the runner's problem, potentially unnecessarily complex for this > use case ) > 2. static splitting on some time partition e.g. finding the earliest > createTime and emitting a PCollection of 1 hour partitions and paginating > through each hour of data w/ in the time frame that the store spans, in a > separate ProcessElement. (easy to implement but will likely have hot keys / > stragglers based on "busy hours") > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
[ https://issues.apache.org/jira/browse/BEAM-9856?focusedWorklogId=435310&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435310 ] ASF GitHub Bot logged work on BEAM-9856: Author: ASF GitHub Bot Created on: 20/May/20 02:05 Start Date: 20/May/20 02:05 Worklog Time Spent: 10m Work Description: pabloem commented on pull request #11596: URL: https://github.com/apache/beam/pull/11596#issuecomment-631189422 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435310) Time Spent: 8h 20m (was: 8h 10m) > HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization > -- > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: P3 > Time Spent: 8h 20m > Remaining Estimate: 0h > > Currently the List Messages API paginates through in a single ProcessElement > Call. > However we could get a restriction based on createTime using Messages.List > filter and orderby. > > This is inline with the future roadmap of HL7v2 bulk export API becomes > available that should allow splitting on (e.g. create time dimension). > Leveraging this bulk export might be a future optimization to explore. > > This could take one of two forms: > 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make > optimization the runner's problem, potentially unnecessarily complex for this > use case ) > 2. static splitting on some time partition e.g. finding the earliest > createTime and emitting a PCollection of 1 hour partitions and paginating > through each hour of data w/ in the time frame that the store spans, in a > separate ProcessElement. (easy to implement but will likely have hot keys / > stragglers based on "busy hours") > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (BEAM-7304) Twister2 Beam runner
[ https://issues.apache.org/jira/browse/BEAM-7304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17111666#comment-17111666 ] Pulasthi Wickramasinghe commented on BEAM-7304: --- That is great to hear, let me know if anything needs to happen from my end > Twister2 Beam runner > > > Key: BEAM-7304 > URL: https://issues.apache.org/jira/browse/BEAM-7304 > Project: Beam > Issue Type: New Feature > Components: runner-ideas >Reporter: Pulasthi Wickramasinghe >Assignee: Pulasthi Wickramasinghe >Priority: P3 > Fix For: 2.22.0 > > Time Spent: 19h 10m > Remaining Estimate: 0h > > Twister2 is a big data framework which supports both batch and stream > processing [1] [2]. The goal is to develop an beam runner for Twister2. > [1] [https://github.com/DSC-SPIDAL/twister2] > [2] [https://twister2.org/] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-10037) BeamSqlExample.java fails to build when running ./gradlew command
[ https://issues.apache.org/jira/browse/BEAM-10037?focusedWorklogId=435305&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435305 ] ASF GitHub Bot logged work on BEAM-10037: - Author: ASF GitHub Bot Created on: 20/May/20 01:16 Start Date: 20/May/20 01:16 Worklog Time Spent: 10m Work Description: omarismail94 commented on a change in pull request #11754: URL: https://github.com/apache/beam/pull/11754#discussion_r427686703 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ## @@ -66,38 +68,47 @@ public static void main(String[] args) { inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); // print the output record of case 1; -outputStream.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// PCOLLECTION: [3, row, 3.0] -// PCOLLECTION: [2, row, 2.0] -System.out.println("PCOLLECTION: " + input.getValues()); -return input; - } -})); +outputStream +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// PCOLLECTION: [3, row, 3.0] +// PCOLLECTION: [2, row, 2.0] +System.out.println("PCOLLECTION: " + input.getValues()); +return input; + } +})) +.setCoder(RowCoder.of(type)); // Case 2. run the query with SqlTransform.query over result PCollection of case 1. PCollection outputStream2 = PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream) .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2")); // print the output record of case 2; -outputStream2.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// CASE1_RESULT: [row, 5.0] -System.out.println("CASE1_RESULT: " + input.getValues()); -return input; - } -})); +outputStream2 +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// CASE1_RESULT: [row, 5.0] +System.out.println("CASE1_RESULT: " + input.getValues()); +return input; + } +})) +.setCoder( +RowCoder.of( +Schema.builder() +.addStringField("stringField") +.addDoubleField("doubleField") +.build())); Review comment: Actually, it is not due to the reduction in the number of field, but the order in which the fields are selected in the SELECT statement. Here is the order it expects * Int, String, Double and the fields that represent those types are: c1, c2, c3 If your results print out of order, it fails due to the `ClassCastException`. I tried doing this query and it failed: `select c2, sum(c1), sum(c3) from CASE1_RESULT group by c2`, but if I do `select sum(c1),c2, sum(c3) from CASE1_RESULT group by c2` it works! You can see that in the one that failed, c1 and c2s positions have switched, so the encoder trips out. What's cool is that you can see the results correctly calculated in: ` System.out.println("CASE1_RESULT: " + input.getValues());` but it seems that when the result is encoded, the program throws an error due to the results being out of order. I guess this is because it sees `.setRowSchema(type);`, and as the order of the schema is "Int, String, Double", the results have to abide by that rule. That why it fails when we did: `c2, sum(c3) from CASE1_RESULT group by c2` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435305) Time Spent: 2h (was: 1h 50m) > BeamSqlExample.java fails to build when running ./gradlew command > ---
[jira] [Work logged] (BEAM-10037) BeamSqlExample.java fails to build when running ./gradlew command
[ https://issues.apache.org/jira/browse/BEAM-10037?focusedWorklogId=435304&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435304 ] ASF GitHub Bot logged work on BEAM-10037: - Author: ASF GitHub Bot Created on: 20/May/20 01:16 Start Date: 20/May/20 01:16 Worklog Time Spent: 10m Work Description: omarismail94 commented on a change in pull request #11754: URL: https://github.com/apache/beam/pull/11754#discussion_r427686703 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ## @@ -66,38 +68,47 @@ public static void main(String[] args) { inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); // print the output record of case 1; -outputStream.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// PCOLLECTION: [3, row, 3.0] -// PCOLLECTION: [2, row, 2.0] -System.out.println("PCOLLECTION: " + input.getValues()); -return input; - } -})); +outputStream +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// PCOLLECTION: [3, row, 3.0] +// PCOLLECTION: [2, row, 2.0] +System.out.println("PCOLLECTION: " + input.getValues()); +return input; + } +})) +.setCoder(RowCoder.of(type)); // Case 2. run the query with SqlTransform.query over result PCollection of case 1. PCollection outputStream2 = PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream) .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2")); // print the output record of case 2; -outputStream2.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// CASE1_RESULT: [row, 5.0] -System.out.println("CASE1_RESULT: " + input.getValues()); -return input; - } -})); +outputStream2 +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// CASE1_RESULT: [row, 5.0] +System.out.println("CASE1_RESULT: " + input.getValues()); +return input; + } +})) +.setCoder( +RowCoder.of( +Schema.builder() +.addStringField("stringField") +.addDoubleField("doubleField") +.build())); Review comment: Actually, it is not due to the reduction in the number of field, but the order in which the fields are printed. Here is the order it expects * Int, String, Double and the fields that represent those types are: c1, c2, c3 If your results print out of order, it fails due to the `ClassCastException`. I tried doing this query and it failed: `select c2, sum(c1), sum(c3) from CASE1_RESULT group by c2`, but if I do `select sum(c1),c2, sum(c3) from CASE1_RESULT group by c2` it works! You can see that in the one that failed, c1 and c2s positions have switched, so the encoder trips out. What's cool is that you can see the results correctly calculated in: ` System.out.println("CASE1_RESULT: " + input.getValues());` but it seems that when the result is encoded, the program throws an error due to the results being out of order. I guess this is because it sees `.setRowSchema(type);`, and as the order of the schema is "Int, String, Double", the results have to abide by that rule. That why it fails when we did: `c2, sum(c3) from CASE1_RESULT group by c2` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435304) Time Spent: 1h 50m (was: 1h 40m) > BeamSqlExample.java fails to build when running ./gradlew command > -
[jira] [Work logged] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
[ https://issues.apache.org/jira/browse/BEAM-9856?focusedWorklogId=435303&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435303 ] ASF GitHub Bot logged work on BEAM-9856: Author: ASF GitHub Bot Created on: 20/May/20 01:14 Start Date: 20/May/20 01:14 Worklog Time Spent: 10m Work Description: jaketf commented on a change in pull request #11596: URL: https://github.com/apache/beam/pull/11596#discussion_r427686974 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ## @@ -427,29 +454,75 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) * @param filter the filter */ ListHL7v2Messages(ValueProvider> hl7v2Stores, ValueProvider filter) { - this.hl7v2Stores = hl7v2Stores.get(); - this.filter = filter.get(); + this.hl7v2Stores = hl7v2Stores; + this.filter = filter; +} + +/** + * Instantiates a new List hl 7 v 2 messages. + * + * @param hl7v2Stores the hl 7 v 2 stores + * @param filter the filter + * @param initialSplitDuration the initial split duration for sendTime dimension splits + */ +ListHL7v2Messages( +ValueProvider> hl7v2Stores, +ValueProvider filter, +Duration initialSplitDuration) { + this.hl7v2Stores = hl7v2Stores; + this.filter = filter; + this.initialSplitDuration = initialSplitDuration; } +/** + * Instantiates a new List hl7v2 messages. + * + * @param hl7v2Stores the hl7v2 stores + */ ListHL7v2Messages(ValueProvider> hl7v2Stores) { - this.hl7v2Stores = hl7v2Stores.get(); + this.hl7v2Stores = hl7v2Stores; this.filter = null; } +/** + * Instantiates a new List hl7v2 messages. + * + * @param hl7v2Stores the hl7v2 stores + * @param initialSplitDuration the initial split duration + */ +ListHL7v2Messages(ValueProvider> hl7v2Stores, Duration initialSplitDuration) { + this.hl7v2Stores = hl7v2Stores; + this.initialSplitDuration = initialSplitDuration; +} + @Override public PCollection expand(PBegin input) { return input - .apply(Create.of(this.hl7v2Stores)) - .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter))) + .apply(Create.ofProvider(this.hl7v2Stores, ListCoder.of(StringUtf8Coder.of( + .apply(FlatMapElements.into(TypeDescriptors.strings()).via((x) -> x)) + .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter, initialSplitDuration))) .setCoder(new HL7v2MessageCoder()) // Break fusion to encourage parallelization of downstream processing. .apply(Reshuffle.viaRandomKey()); } } + /** + * Implemented as Splitable DoFn that claims millisecond resolutions of offset restrictions in the + * Message.sendTime dimension. + */ + @BoundedPerElement static class ListHL7v2MessagesFn extends DoFn { -private final String filter; +private static final Logger LOG = LoggerFactory.getLogger(ListHL7v2MessagesFn.class); +private ValueProvider filter; +// These control the initial restriction split which means that the list of integer pairs +// must comfortably fit in memory. +private static final Duration DEFAULT_DESIRED_SPLIT_DURATION = Duration.standardDays(1); +private static final Duration DEFAULT_MIN_SPLIT_DURATION = Duration.standardHours(1); +private Duration initialSplitDuration; +private Instant from; +private Instant to; Review comment: I don't think so they don't get set until we make the earliest / lastest sendTime query in @GetInitialRestriction This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435303) Time Spent: 8h 10m (was: 8h) > HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization > -- > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: P3 > Time Spent: 8h 10m > Remaining Estimate: 0h > > Currently the List Messages API paginates through in a single ProcessElement > Call. > However we could get a restriction based on createTime using Messages.List > filter and orderby. > > This is inline with t
[jira] [Work logged] (BEAM-10037) BeamSqlExample.java fails to build when running ./gradlew command
[ https://issues.apache.org/jira/browse/BEAM-10037?focusedWorklogId=435302&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435302 ] ASF GitHub Bot logged work on BEAM-10037: - Author: ASF GitHub Bot Created on: 20/May/20 01:13 Start Date: 20/May/20 01:13 Worklog Time Spent: 10m Work Description: omarismail94 commented on a change in pull request #11754: URL: https://github.com/apache/beam/pull/11754#discussion_r427686703 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ## @@ -66,38 +68,47 @@ public static void main(String[] args) { inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); // print the output record of case 1; -outputStream.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// PCOLLECTION: [3, row, 3.0] -// PCOLLECTION: [2, row, 2.0] -System.out.println("PCOLLECTION: " + input.getValues()); -return input; - } -})); +outputStream +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// PCOLLECTION: [3, row, 3.0] +// PCOLLECTION: [2, row, 2.0] +System.out.println("PCOLLECTION: " + input.getValues()); +return input; + } +})) +.setCoder(RowCoder.of(type)); // Case 2. run the query with SqlTransform.query over result PCollection of case 1. PCollection outputStream2 = PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream) .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2")); // print the output record of case 2; -outputStream2.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// CASE1_RESULT: [row, 5.0] -System.out.println("CASE1_RESULT: " + input.getValues()); -return input; - } -})); +outputStream2 +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// CASE1_RESULT: [row, 5.0] +System.out.println("CASE1_RESULT: " + input.getValues()); +return input; + } +})) +.setCoder( +RowCoder.of( +Schema.builder() +.addStringField("stringField") +.addDoubleField("doubleField") +.build())); Review comment: Actually, it is not due to the reduction in the number of field, but the order in which the fields are printed. Here is the order it expects * Int, String, Double and the fields that represent those types are: c1, c2, c3 If your results print out of order, it fails due to the `ClassCastException`. I tried doing this query and it failed: `select c2, sum(c1), sum(c3) from CASE1_RESULT group by c2`, but if I do `select sum(c1),c2, sum(c3) from CASE1_RESULT group by c2` it works! You can see that in the one that failed, c1 and c2s positions have switched, so the encoder trips out. What's cool is that you can see the results correctly calculated in: ` System.out.println("CASE1_RESULT: " + input.getValues());` but it seems that when the result is encoded, the program throws an error due to the results being out of order This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435302) Time Spent: 1h 40m (was: 1.5h) > BeamSqlExample.java fails to build when running ./gradlew command > - > > Key: BEAM-10037 > URL: https://issues.apache.org/jira/browse/BEAM-10037 > Project: Beam > Issue Type: Improvement > Components: dsl-sql >
[jira] [Work logged] (BEAM-9723) [Java] PTransform that connects to Cloud DLP deidentification service
[ https://issues.apache.org/jira/browse/BEAM-9723?focusedWorklogId=435301&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435301 ] ASF GitHub Bot logged work on BEAM-9723: Author: ASF GitHub Bot Created on: 20/May/20 01:05 Start Date: 20/May/20 01:05 Worklog Time Spent: 10m Work Description: santhh commented on a change in pull request #11566: URL: https://github.com/apache/beam/pull/11566#discussion_r427684818 ## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java ## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.auto.value.AutoValue; +import com.google.cloud.dlp.v2.DlpServiceClient; +import com.google.privacy.dlp.v2.ContentItem; +import com.google.privacy.dlp.v2.DeidentifyConfig; +import com.google.privacy.dlp.v2.DeidentifyContentRequest; +import com.google.privacy.dlp.v2.DeidentifyContentResponse; +import com.google.privacy.dlp.v2.FieldId; +import com.google.privacy.dlp.v2.InspectConfig; +import com.google.privacy.dlp.v2.ProjectName; +import com.google.privacy.dlp.v2.Table; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * A {@link PTransform} connecting to Cloud DLP and deidentifying text according to provided + * settings. The transform supports both CSV formatted input data and unstructured input. + * + * If the csvHeader property is set, csvDelimiter also should be, else the results will be + * incorrect. If csvHeader is not set, input is assumed to be unstructured. + * + * Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set. The + * situation is the same with deidentifyTemplateName and deidentifyConfig ({@link DeidentifyConfig}. + * + * Batch size defines how big are batches sent to DLP at once in bytes. + * + * The transform outputs {@link KV} of {@link String} (eg. filename) and {@link + * DeidentifyContentResponse}, which will contain {@link Table} of results for the user to consume. + */ +@Experimental +@AutoValue +public abstract class DLPDeidentifyText +extends PTransform< +PCollection>, PCollection>> { + + @Nullable + public abstract String inspectTemplateName(); + + @Nullable + public abstract String deidentifyTemplateName(); + + @Nullable + public abstract InspectConfig inspectConfig(); + + @Nullable + public abstract DeidentifyConfig deidentifyConfig(); + + @Nullable + public abstract PCollectionView> csvHeader(); + + @Nullable + public abstract String csvDelimiter(); + + public abstract Integer batchSize(); + + public abstract String projectId(); + + @AutoValue.Builder + public abstract static class Builder { +public abstract Builder setInspectTemplateName(String inspectTemplateName); + +public abstract Builder setCsvHeader(PCollectionView> csvHeader); + +public abstract Builder setCsvDelimiter(String delimiter); + +public abstract Builder setBatchSize(Integer batchSize); + +public abstract Builder setProjectId(String projectId); + +public abstract Builder setDeidentifyTemplateName(String deidentifyTemplateName); + +public abstract Builder setInspectConfig(InspectConfig inspectConfig); + +public abstract Builder setDeidentifyConfig(DeidentifyConfig deidentifyConfig); + +public abstract DLPDeidentifyText build(); + } + + public static DLPDeidentifyText.Builder newBuilder() { +return new AutoValue_DLPDeidentifyText.Builder(); + } + + /** + * The transform batches the contents of input PCollection and then calls Cloud DLP service to + * perform the deidentification. + * + * @pa
[jira] [Work logged] (BEAM-9723) [Java] PTransform that connects to Cloud DLP deidentification service
[ https://issues.apache.org/jira/browse/BEAM-9723?focusedWorklogId=435300&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435300 ] ASF GitHub Bot logged work on BEAM-9723: Author: ASF GitHub Bot Created on: 20/May/20 01:01 Start Date: 20/May/20 01:01 Worklog Time Spent: 10m Work Description: santhh commented on a change in pull request #11566: URL: https://github.com/apache/beam/pull/11566#discussion_r427683846 ## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java ## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.auto.value.AutoValue; +import com.google.cloud.dlp.v2.DlpServiceClient; +import com.google.privacy.dlp.v2.ContentItem; +import com.google.privacy.dlp.v2.DeidentifyConfig; +import com.google.privacy.dlp.v2.DeidentifyContentRequest; +import com.google.privacy.dlp.v2.DeidentifyContentResponse; +import com.google.privacy.dlp.v2.FieldId; +import com.google.privacy.dlp.v2.InspectConfig; +import com.google.privacy.dlp.v2.ProjectName; +import com.google.privacy.dlp.v2.Table; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * A {@link PTransform} connecting to Cloud DLP and deidentifying text according to provided + * settings. The transform supports both CSV formatted input data and unstructured input. + * + * If the csvHeader property is set, csvDelimiter also should be, else the results will be + * incorrect. If csvHeader is not set, input is assumed to be unstructured. + * + * Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set. The + * situation is the same with deidentifyTemplateName and deidentifyConfig ({@link DeidentifyConfig}. + * + * Batch size defines how big are batches sent to DLP at once in bytes. + * + * The transform outputs {@link KV} of {@link String} (eg. filename) and {@link + * DeidentifyContentResponse}, which will contain {@link Table} of results for the user to consume. + */ +@Experimental +@AutoValue +public abstract class DLPDeidentifyText +extends PTransform< +PCollection>, PCollection>> { + + @Nullable + public abstract String inspectTemplateName(); + + @Nullable + public abstract String deidentifyTemplateName(); + + @Nullable + public abstract InspectConfig inspectConfig(); + + @Nullable + public abstract DeidentifyConfig deidentifyConfig(); + + @Nullable + public abstract PCollectionView> csvHeader(); + + @Nullable + public abstract String csvDelimiter(); + + public abstract Integer batchSize(); + + public abstract String projectId(); + + @AutoValue.Builder + public abstract static class Builder { +public abstract Builder setInspectTemplateName(String inspectTemplateName); + +public abstract Builder setCsvHeader(PCollectionView> csvHeader); + +public abstract Builder setCsvDelimiter(String delimiter); + +public abstract Builder setBatchSize(Integer batchSize); + +public abstract Builder setProjectId(String projectId); + +public abstract Builder setDeidentifyTemplateName(String deidentifyTemplateName); + +public abstract Builder setInspectConfig(InspectConfig inspectConfig); + +public abstract Builder setDeidentifyConfig(DeidentifyConfig deidentifyConfig); + +public abstract DLPDeidentifyText build(); + } + Review comment: For de-id - it's also same as re-id. de-id template in required but inspect is optional. This is an automated message from the Apache Git Service. To respond to the message, please lo
[jira] [Work logged] (BEAM-9723) [Java] PTransform that connects to Cloud DLP deidentification service
[ https://issues.apache.org/jira/browse/BEAM-9723?focusedWorklogId=435298&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435298 ] ASF GitHub Bot logged work on BEAM-9723: Author: ASF GitHub Bot Created on: 20/May/20 00:58 Start Date: 20/May/20 00:58 Worklog Time Spent: 10m Work Description: santhh commented on a change in pull request #11566: URL: https://github.com/apache/beam/pull/11566#discussion_r427683019 ## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.privacy.dlp.v2.Table; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * DoFn batching the input PCollection into bigger requests in order to better utilize the Cloud DLP + * service. + */ +@Experimental +class BatchRequestForDLP extends DoFn, KV>> { + public static final Logger LOG = LoggerFactory.getLogger(BatchRequestForDLP.class); + + private final Counter numberOfRowsBagged = + Metrics.counter(BatchRequestForDLP.class, "numberOfRowsBagged"); + private final Integer batchSize; + + @StateId("elementsBag") + private final StateSpec>> elementsBag = StateSpecs.bag(); + + @TimerId("eventTimer") + private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + public BatchRequestForDLP(Integer batchSize) { Review comment: +1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435298) Time Spent: 3h 10m (was: 3h) > [Java] PTransform that connects to Cloud DLP deidentification service > - > > Key: BEAM-9723 > URL: https://issues.apache.org/jira/browse/BEAM-9723 > Project: Beam > Issue Type: Sub-task > Components: io-java-gcp >Reporter: Michał Walenia >Assignee: Michał Walenia >Priority: P2 > Time Spent: 3h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9603) Support Dynamic Timer in Java SDK over FnApi
[ https://issues.apache.org/jira/browse/BEAM-9603?focusedWorklogId=435294&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435294 ] ASF GitHub Bot logged work on BEAM-9603: Author: ASF GitHub Bot Created on: 20/May/20 00:55 Start Date: 20/May/20 00:55 Worklog Time Spent: 10m Work Description: y1chi opened a new pull request #11756: URL: https://github.com/apache/beam/pull/11756 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.a
[jira] [Work logged] (BEAM-9603) Support Dynamic Timer in Java SDK over FnApi
[ https://issues.apache.org/jira/browse/BEAM-9603?focusedWorklogId=435296&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435296 ] ASF GitHub Bot logged work on BEAM-9603: Author: ASF GitHub Bot Created on: 20/May/20 00:55 Start Date: 20/May/20 00:55 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #11756: URL: https://github.com/apache/beam/pull/11756#issuecomment-631169795 R: @boyuanzz This is ready for review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435296) Time Spent: 1h 40m (was: 1.5h) > Support Dynamic Timer in Java SDK over FnApi > > > Key: BEAM-9603 > URL: https://issues.apache.org/jira/browse/BEAM-9603 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Boyuan Zhang >Priority: P2 > Time Spent: 1h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9723) [Java] PTransform that connects to Cloud DLP deidentification service
[ https://issues.apache.org/jira/browse/BEAM-9723?focusedWorklogId=435293&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435293 ] ASF GitHub Bot logged work on BEAM-9723: Author: ASF GitHub Bot Created on: 20/May/20 00:53 Start Date: 20/May/20 00:53 Worklog Time Spent: 10m Work Description: santhh commented on a change in pull request #11566: URL: https://github.com/apache/beam/pull/11566#discussion_r427681778 ## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java ## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.auto.value.AutoValue; +import com.google.cloud.dlp.v2.DlpServiceClient; +import com.google.privacy.dlp.v2.ContentItem; +import com.google.privacy.dlp.v2.DeidentifyConfig; +import com.google.privacy.dlp.v2.FieldId; +import com.google.privacy.dlp.v2.InspectConfig; +import com.google.privacy.dlp.v2.ProjectName; +import com.google.privacy.dlp.v2.ReidentifyContentRequest; +import com.google.privacy.dlp.v2.ReidentifyContentResponse; +import com.google.privacy.dlp.v2.Table; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} connecting to Cloud DLP and inspecting text for identifying data according + * to provided settings. + * + * Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set, the + * same goes for reidentifyTemplateName or reidentifyConfig. + * + * Batch size defines how big are batches sent to DLP at once in bytes. + */ +@Experimental +@AutoValue +public abstract class DLPReidentifyText +extends PTransform< +PCollection>, PCollection>> { + + public static final Logger LOG = LoggerFactory.getLogger(DLPInspectText.class); + + public static final Integer DLP_PAYLOAD_LIMIT = 52400; + + @Nullable + public abstract String inspectTemplateName(); + + @Nullable + public abstract String reidentifyTemplateName(); + + @Nullable + public abstract InspectConfig inspectConfig(); + + @Nullable + public abstract DeidentifyConfig reidentifyConfig(); + + @Nullable + public abstract String csvDelimiter(); + + @Nullable + public abstract PCollectionView> csvHeaders(); + + public abstract Integer batchSize(); + + public abstract String projectId(); + + @AutoValue.Builder + public abstract static class Builder { +public abstract Builder setInspectTemplateName(String inspectTemplateName); + +public abstract Builder setInspectConfig(InspectConfig inspectConfig); + +public abstract Builder setReidentifyConfig(DeidentifyConfig deidentifyConfig); + +public abstract Builder setReidentifyTemplateName(String deidentifyTemplateName); + +public abstract Builder setBatchSize(Integer batchSize); + +public abstract Builder setCsvHeaders(PCollectionView> csvHeaders); + +public abstract Builder setCsvDelimiter(String delimiter); + +public abstract Builder setProjectId(String projectId); + +public abstract DLPReidentifyText build(); + } + + public static DLPReidentifyText.Builder newBuilder() { +return new AutoValue_DLPReidentifyText.Builder(); + } + + @Override + public PCollection> expand( + PCollection> input) { +return input +.apply(ParDo.of(new MapStringToDlpRow(csvDelimiter( +.apply("Batch Contents", ParDo.of(new BatchRequestForDLP(batchSize( +.apply( +"DLPDeidentify", +ParDo.of( +new ReidentifyText( +projectId(), +inspectTempla
[jira] [Work logged] (BEAM-10037) BeamSqlExample.java fails to build when running ./gradlew command
[ https://issues.apache.org/jira/browse/BEAM-10037?focusedWorklogId=435292&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435292 ] ASF GitHub Bot logged work on BEAM-10037: - Author: ASF GitHub Bot Created on: 20/May/20 00:52 Start Date: 20/May/20 00:52 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on a change in pull request #11754: URL: https://github.com/apache/beam/pull/11754#discussion_r427681580 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ## @@ -66,38 +68,47 @@ public static void main(String[] args) { inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); // print the output record of case 1; -outputStream.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// PCOLLECTION: [3, row, 3.0] -// PCOLLECTION: [2, row, 2.0] -System.out.println("PCOLLECTION: " + input.getValues()); -return input; - } -})); +outputStream +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// PCOLLECTION: [3, row, 3.0] +// PCOLLECTION: [2, row, 2.0] +System.out.println("PCOLLECTION: " + input.getValues()); +return input; + } +})) +.setCoder(RowCoder.of(type)); // Case 2. run the query with SqlTransform.query over result PCollection of case 1. PCollection outputStream2 = PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream) .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2")); // print the output record of case 2; -outputStream2.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// CASE1_RESULT: [row, 5.0] -System.out.println("CASE1_RESULT: " + input.getValues()); -return input; - } -})); +outputStream2 +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// CASE1_RESULT: [row, 5.0] +System.out.println("CASE1_RESULT: " + input.getValues()); +return input; + } +})) +.setCoder( +RowCoder.of( +Schema.builder() +.addStringField("stringField") +.addDoubleField("doubleField") +.build())); Review comment: oh for this call you will need to use ``` Schema.builder() .addStringField("stringField") .addDoubleField("doubleField") .build() ``` like you had in the setCoder call This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435292) Time Spent: 1.5h (was: 1h 20m) > BeamSqlExample.java fails to build when running ./gradlew command > - > > Key: BEAM-10037 > URL: https://issues.apache.org/jira/browse/BEAM-10037 > Project: Beam > Issue Type: Improvement > Components: dsl-sql >Reporter: Omar Ismail >Assignee: Omar Ismail >Priority: P3 > Time Spent: 1.5h > Remaining Estimate: 0h > > In the `BeamSqlExample.java` class, the instructions state that to run the > example, use: > `./gradlew :sdks:java:extensions:sql:runBasicExample`. > I tried this and the build failed due to `java.lang.IllegalStateException: > Unable to return a default Coder` > > I will try to fix this! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9723) [Java] PTransform that connects to Cloud DLP deidentification service
[ https://issues.apache.org/jira/browse/BEAM-9723?focusedWorklogId=435291&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435291 ] ASF GitHub Bot logged work on BEAM-9723: Author: ASF GitHub Bot Created on: 20/May/20 00:51 Start Date: 20/May/20 00:51 Worklog Time Spent: 10m Work Description: santhh commented on a change in pull request #11566: URL: https://github.com/apache/beam/pull/11566#discussion_r427681156 ## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java ## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.auto.value.AutoValue; +import com.google.cloud.dlp.v2.DlpServiceClient; +import com.google.privacy.dlp.v2.ContentItem; +import com.google.privacy.dlp.v2.DeidentifyConfig; +import com.google.privacy.dlp.v2.FieldId; +import com.google.privacy.dlp.v2.InspectConfig; +import com.google.privacy.dlp.v2.ProjectName; +import com.google.privacy.dlp.v2.ReidentifyContentRequest; +import com.google.privacy.dlp.v2.ReidentifyContentResponse; +import com.google.privacy.dlp.v2.Table; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} connecting to Cloud DLP and inspecting text for identifying data according + * to provided settings. + * + * Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set, the + * same goes for reidentifyTemplateName or reidentifyConfig. + * + * Batch size defines how big are batches sent to DLP at once in bytes. + */ +@Experimental +@AutoValue +public abstract class DLPReidentifyText +extends PTransform< +PCollection>, PCollection>> { + + public static final Logger LOG = LoggerFactory.getLogger(DLPInspectText.class); + + public static final Integer DLP_PAYLOAD_LIMIT = 52400; + + @Nullable + public abstract String inspectTemplateName(); + + @Nullable + public abstract String reidentifyTemplateName(); + + @Nullable + public abstract InspectConfig inspectConfig(); + + @Nullable + public abstract DeidentifyConfig reidentifyConfig(); + + @Nullable + public abstract String csvDelimiter(); + + @Nullable + public abstract PCollectionView> csvHeaders(); + + public abstract Integer batchSize(); + + public abstract String projectId(); + + @AutoValue.Builder + public abstract static class Builder { +public abstract Builder setInspectTemplateName(String inspectTemplateName); + +public abstract Builder setInspectConfig(InspectConfig inspectConfig); + +public abstract Builder setReidentifyConfig(DeidentifyConfig deidentifyConfig); + +public abstract Builder setReidentifyTemplateName(String deidentifyTemplateName); + +public abstract Builder setBatchSize(Integer batchSize); + +public abstract Builder setCsvHeaders(PCollectionView> csvHeaders); + +public abstract Builder setCsvDelimiter(String delimiter); + +public abstract Builder setProjectId(String projectId); + +public abstract DLPReidentifyText build(); + } + + public static DLPReidentifyText.Builder newBuilder() { +return new AutoValue_DLPReidentifyText.Builder(); + } + + @Override + public PCollection> expand( + PCollection> input) { +return input +.apply(ParDo.of(new MapStringToDlpRow(csvDelimiter( +.apply("Batch Contents", ParDo.of(new BatchRequestForDLP(batchSize( +.apply( +"DLPDeidentify", Review comment: should this be re-identify? This is
[jira] [Work logged] (BEAM-10038) Add script to mass-comment Jenkins triggers on PR
[ https://issues.apache.org/jira/browse/BEAM-10038?focusedWorklogId=435290&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435290 ] ASF GitHub Bot logged work on BEAM-10038: - Author: ASF GitHub Bot Created on: 20/May/20 00:50 Start Date: 20/May/20 00:50 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on a change in pull request #11755: URL: https://github.com/apache/beam/pull/11755#discussion_r427681057 ## File path: release/src/main/scripts/mass_comment.py ## @@ -0,0 +1,141 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Script for mass-commenting Jenkins test triggers on a Beam PR.""" + +import itertools +import os +import socket +import sys +import time +import traceback +import re +import requests +from datetime import datetime + + +COMMENTS_TO_ADD=[ + "Run Go PostCommit", + "Run Java PostCommit", + "Run Java PortabilityApi PostCommit", + "Run Java Flink PortableValidatesRunner Batch", + "Run Java Flink PortableValidatesRunner Streaming", + "Run Apex ValidatesRunner", + "Run Dataflow ValidatesRunner", + "Run Flink ValidatesRunner", + "Run Gearpump ValidatesRunner", + "Run Dataflow PortabilityApi ValidatesRunner", + "Run Samza ValidatesRunner", + "Run Spark ValidatesRunner", + "Run Python Dataflow ValidatesContainer", + "Run Python Dataflow ValidatesRunner", + "Run Python 3.5 Flink ValidatesRunner", + "Run Python 2 PostCommit", + "Run Python 3.5 PostCommit", + "Run SQL PostCommit", + "Run Go PreCommit", + "Run Java PreCommit", + "Run Java_Examples_Dataflow PreCommit", + "Run JavaPortabilityApi PreCommit", + "Run Portable_Python PreCommit", + "Run PythonLint PreCommit", + "Run Python PreCommit", + "Run Python DockerBuild PreCommit" +] Review comment: Should we also remove the duplicate list from [verify_release_build.sh](https://github.com/apache/beam/blob/master/release/src/main/scripts/verify_release_build.sh#L43) as part of this PR? I'm not really clear on where this list comes from. Is the goal to launch every single jenkins job? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435290) Time Spent: 20m (was: 10m) > Add script to mass-comment Jenkins triggers on PR > - > > Key: BEAM-10038 > URL: https://issues.apache.org/jira/browse/BEAM-10038 > Project: Beam > Issue Type: Improvement > Components: build-system >Reporter: Kyle Weaver >Assignee: Kyle Weaver >Priority: P2 > Time Spent: 20m > Remaining Estimate: 0h > > This is a work in progress, it just needs to be touched up and added to the > Beam repo: > https://gist.github.com/Ardagan/13e6031e8d1c9ebbd3029bf365c1a517 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-10037) BeamSqlExample.java fails to build when running ./gradlew command
[ https://issues.apache.org/jira/browse/BEAM-10037?focusedWorklogId=435287&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435287 ] ASF GitHub Bot logged work on BEAM-10037: - Author: ASF GitHub Bot Created on: 20/May/20 00:41 Start Date: 20/May/20 00:41 Worklog Time Spent: 10m Work Description: omarismail94 commented on a change in pull request #11754: URL: https://github.com/apache/beam/pull/11754#discussion_r427678727 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ## @@ -66,38 +68,47 @@ public static void main(String[] args) { inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); // print the output record of case 1; -outputStream.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// PCOLLECTION: [3, row, 3.0] -// PCOLLECTION: [2, row, 2.0] -System.out.println("PCOLLECTION: " + input.getValues()); -return input; - } -})); +outputStream +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// PCOLLECTION: [3, row, 3.0] +// PCOLLECTION: [2, row, 2.0] +System.out.println("PCOLLECTION: " + input.getValues()); +return input; + } +})) +.setCoder(RowCoder.of(type)); // Case 2. run the query with SqlTransform.query over result PCollection of case 1. PCollection outputStream2 = PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream) .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2")); // print the output record of case 2; -outputStream2.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// CASE1_RESULT: [row, 5.0] -System.out.println("CASE1_RESULT: " + input.getValues()); -return input; - } -})); +outputStream2 +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// CASE1_RESULT: [row, 5.0] +System.out.println("CASE1_RESULT: " + input.getValues()); +return input; + } +})) +.setCoder( +RowCoder.of( +Schema.builder() +.addStringField("stringField") +.addDoubleField("doubleField") +.build())); Review comment: This is part of the Stack trace that makes me think that ``` Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer at org.apache.beam.sdk.coders.VarIntCoder.encode(VarIntCoder.java:33) at org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate(RowCoderGenerator.java:270) at org.apache.beam.sdk.coders.Coder$ByteBuddy$E99UrF3W.encode(Unknown Source) at org.apache.beam.sdk.coders.Coder$ByteBuddy$E99UrF3W.encode(Unknown Source) at org.apache.beam.sdk.schemas.SchemaCoder.encode(SchemaCoder.java:115) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435287) Time Spent: 1h 20m (was: 1h 10m) > BeamSqlExample.java fails to build when running ./gradlew command > - > > Key: BEAM-10037 > URL: https://issues.apache.org/jira/browse/BEAM-10037 > Project: Beam > Issue Type: Improvement > Components: dsl-sql >Reporter: Omar Ismail >Assignee: Omar Ismail >Priority: P3 > Time Spent: 1h 20m > Remaining Estimate: 0h > > In the `BeamSqlExample.java` class, the instructions state that to run the > example, use: >
[jira] [Work logged] (BEAM-10037) BeamSqlExample.java fails to build when running ./gradlew command
[ https://issues.apache.org/jira/browse/BEAM-10037?focusedWorklogId=435284&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435284 ] ASF GitHub Bot logged work on BEAM-10037: - Author: ASF GitHub Bot Created on: 20/May/20 00:40 Start Date: 20/May/20 00:40 Worklog Time Spent: 10m Work Description: omarismail94 commented on a change in pull request #11754: URL: https://github.com/apache/beam/pull/11754#discussion_r427678033 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ## @@ -66,38 +68,47 @@ public static void main(String[] args) { inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); // print the output record of case 1; -outputStream.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// PCOLLECTION: [3, row, 3.0] -// PCOLLECTION: [2, row, 2.0] -System.out.println("PCOLLECTION: " + input.getValues()); -return input; - } -})); +outputStream +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// PCOLLECTION: [3, row, 3.0] +// PCOLLECTION: [2, row, 2.0] +System.out.println("PCOLLECTION: " + input.getValues()); +return input; + } +})) +.setCoder(RowCoder.of(type)); // Case 2. run the query with SqlTransform.query over result PCollection of case 1. PCollection outputStream2 = PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream) .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2")); // print the output record of case 2; -outputStream2.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// CASE1_RESULT: [row, 5.0] -System.out.println("CASE1_RESULT: " + input.getValues()); -return input; - } -})); +outputStream2 +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// CASE1_RESULT: [row, 5.0] +System.out.println("CASE1_RESULT: " + input.getValues()); +return input; + } +})) +.setCoder( +RowCoder.of( +Schema.builder() +.addStringField("stringField") +.addDoubleField("doubleField") +.build())); Review comment: I tried `setRowSchema(type)` and it failed with `: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer`. I think it is inferring the schema as 3 fields, but the result only returns fields, and that's why it throws the error This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435284) Time Spent: 1h (was: 50m) > BeamSqlExample.java fails to build when running ./gradlew command > - > > Key: BEAM-10037 > URL: https://issues.apache.org/jira/browse/BEAM-10037 > Project: Beam > Issue Type: Improvement > Components: dsl-sql >Reporter: Omar Ismail >Assignee: Omar Ismail >Priority: P3 > Time Spent: 1h > Remaining Estimate: 0h > > In the `BeamSqlExample.java` class, the instructions state that to run the > example, use: > `./gradlew :sdks:java:extensions:sql:runBasicExample`. > I tried this and the build failed due to `java.lang.IllegalStateException: > Unable to return a default Coder` > > I will try to fix this! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-10037) BeamSqlExample.java fails to build when running ./gradlew command
[ https://issues.apache.org/jira/browse/BEAM-10037?focusedWorklogId=435285&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435285 ] ASF GitHub Bot logged work on BEAM-10037: - Author: ASF GitHub Bot Created on: 20/May/20 00:40 Start Date: 20/May/20 00:40 Worklog Time Spent: 10m Work Description: omarismail94 commented on a change in pull request #11754: URL: https://github.com/apache/beam/pull/11754#discussion_r427678033 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ## @@ -66,38 +68,47 @@ public static void main(String[] args) { inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); // print the output record of case 1; -outputStream.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// PCOLLECTION: [3, row, 3.0] -// PCOLLECTION: [2, row, 2.0] -System.out.println("PCOLLECTION: " + input.getValues()); -return input; - } -})); +outputStream +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// PCOLLECTION: [3, row, 3.0] +// PCOLLECTION: [2, row, 2.0] +System.out.println("PCOLLECTION: " + input.getValues()); +return input; + } +})) +.setCoder(RowCoder.of(type)); // Case 2. run the query with SqlTransform.query over result PCollection of case 1. PCollection outputStream2 = PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream) .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2")); // print the output record of case 2; -outputStream2.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// CASE1_RESULT: [row, 5.0] -System.out.println("CASE1_RESULT: " + input.getValues()); -return input; - } -})); +outputStream2 +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// CASE1_RESULT: [row, 5.0] +System.out.println("CASE1_RESULT: " + input.getValues()); +return input; + } +})) +.setCoder( +RowCoder.of( +Schema.builder() +.addStringField("stringField") +.addDoubleField("doubleField") +.build())); Review comment: I tried `setRowSchema(type)` and it failed with `: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer`. I think it is inferring the schema as 3 fields, but the result only returns two fields, and that's why it throws the error This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435285) Time Spent: 1h 10m (was: 1h) > BeamSqlExample.java fails to build when running ./gradlew command > - > > Key: BEAM-10037 > URL: https://issues.apache.org/jira/browse/BEAM-10037 > Project: Beam > Issue Type: Improvement > Components: dsl-sql >Reporter: Omar Ismail >Assignee: Omar Ismail >Priority: P3 > Time Spent: 1h 10m > Remaining Estimate: 0h > > In the `BeamSqlExample.java` class, the instructions state that to run the > example, use: > `./gradlew :sdks:java:extensions:sql:runBasicExample`. > I tried this and the build failed due to `java.lang.IllegalStateException: > Unable to return a default Coder` > > I will try to fix this! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9603) Support Dynamic Timer in Java SDK over FnApi
[ https://issues.apache.org/jira/browse/BEAM-9603?focusedWorklogId=435286&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435286 ] ASF GitHub Bot logged work on BEAM-9603: Author: ASF GitHub Bot Created on: 20/May/20 00:40 Start Date: 20/May/20 00:40 Worklog Time Spent: 10m Work Description: y1chi closed pull request #11753: URL: https://github.com/apache/beam/pull/11753 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435286) Time Spent: 1h 20m (was: 1h 10m) > Support Dynamic Timer in Java SDK over FnApi > > > Key: BEAM-9603 > URL: https://issues.apache.org/jira/browse/BEAM-9603 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Boyuan Zhang >Priority: P2 > Time Spent: 1h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-10037) BeamSqlExample.java fails to build when running ./gradlew command
[ https://issues.apache.org/jira/browse/BEAM-10037?focusedWorklogId=435283&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435283 ] ASF GitHub Bot logged work on BEAM-10037: - Author: ASF GitHub Bot Created on: 20/May/20 00:40 Start Date: 20/May/20 00:40 Worklog Time Spent: 10m Work Description: omarismail94 commented on pull request #11754: URL: https://github.com/apache/beam/pull/11754#issuecomment-631165775 > Thank you @omarismail94! > > We should probably be running this continuously to make sure we don't break it again. Would you mind adding the gradle task for this to the SQL preCommit [here](https://github.com/apache/beam/blob/master/build.gradle#L154)? That way it will run before we merge any PR that affects SQL. I will add both this and runPojoExample to SQL preCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435283) Time Spent: 50m (was: 40m) > BeamSqlExample.java fails to build when running ./gradlew command > - > > Key: BEAM-10037 > URL: https://issues.apache.org/jira/browse/BEAM-10037 > Project: Beam > Issue Type: Improvement > Components: dsl-sql >Reporter: Omar Ismail >Assignee: Omar Ismail >Priority: P3 > Time Spent: 50m > Remaining Estimate: 0h > > In the `BeamSqlExample.java` class, the instructions state that to run the > example, use: > `./gradlew :sdks:java:extensions:sql:runBasicExample`. > I tried this and the build failed due to `java.lang.IllegalStateException: > Unable to return a default Coder` > > I will try to fix this! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-10037) BeamSqlExample.java fails to build when running ./gradlew command
[ https://issues.apache.org/jira/browse/BEAM-10037?focusedWorklogId=435282&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435282 ] ASF GitHub Bot logged work on BEAM-10037: - Author: ASF GitHub Bot Created on: 20/May/20 00:39 Start Date: 20/May/20 00:39 Worklog Time Spent: 10m Work Description: omarismail94 commented on a change in pull request #11754: URL: https://github.com/apache/beam/pull/11754#discussion_r427678033 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ## @@ -66,38 +68,47 @@ public static void main(String[] args) { inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); // print the output record of case 1; -outputStream.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// PCOLLECTION: [3, row, 3.0] -// PCOLLECTION: [2, row, 2.0] -System.out.println("PCOLLECTION: " + input.getValues()); -return input; - } -})); +outputStream +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// PCOLLECTION: [3, row, 3.0] +// PCOLLECTION: [2, row, 2.0] +System.out.println("PCOLLECTION: " + input.getValues()); +return input; + } +})) +.setCoder(RowCoder.of(type)); // Case 2. run the query with SqlTransform.query over result PCollection of case 1. PCollection outputStream2 = PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream) .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2")); // print the output record of case 2; -outputStream2.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// CASE1_RESULT: [row, 5.0] -System.out.println("CASE1_RESULT: " + input.getValues()); -return input; - } -})); +outputStream2 +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// CASE1_RESULT: [row, 5.0] +System.out.println("CASE1_RESULT: " + input.getValues()); +return input; + } +})) +.setCoder( +RowCoder.of( +Schema.builder() +.addStringField("stringField") +.addDoubleField("doubleField") +.build())); Review comment: I tried `setRowSchema(type) and it failed with `: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer`. I think it is inferring the schema as 3 fields, but the result only returns fields, and that's why it throws the error This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435282) Time Spent: 40m (was: 0.5h) > BeamSqlExample.java fails to build when running ./gradlew command > - > > Key: BEAM-10037 > URL: https://issues.apache.org/jira/browse/BEAM-10037 > Project: Beam > Issue Type: Improvement > Components: dsl-sql >Reporter: Omar Ismail >Assignee: Omar Ismail >Priority: P3 > Time Spent: 40m > Remaining Estimate: 0h > > In the `BeamSqlExample.java` class, the instructions state that to run the > example, use: > `./gradlew :sdks:java:extensions:sql:runBasicExample`. > I tried this and the build failed due to `java.lang.IllegalStateException: > Unable to return a default Coder` > > I will try to fix this! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-10039) Create a mechanism to run a custom worker intialization code on Python workers
Valentyn Tymofieiev created BEAM-10039: -- Summary: Create a mechanism to run a custom worker intialization code on Python workers Key: BEAM-10039 URL: https://issues.apache.org/jira/browse/BEAM-10039 Project: Beam Issue Type: New Feature Components: sdk-py-core Reporter: Valentyn Tymofieiev A couple of Beam users mentioned a usecase where some initialization code needs to run on Python workers before pipeline processing starts. Such code needs to be executed run early in the main() method of python worker[1]. Java SDK has provides this capability via JvmInitializer [2], BEAM-6872. Let's add such capability for Python users as well. [1] https://github.com/apache/beam/blob/7ad4c4c8e601e39573aae7b4d778be2e908a0868/sdks/python/apache_beam/runners/worker/sdk_worker_main.py#L85 [2] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/harness/JvmInitializer.java -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-10037) BeamSqlExample.java fails to build when running ./gradlew command
[ https://issues.apache.org/jira/browse/BEAM-10037?focusedWorklogId=435281&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435281 ] ASF GitHub Bot logged work on BEAM-10037: - Author: ASF GitHub Bot Created on: 20/May/20 00:38 Start Date: 20/May/20 00:38 Worklog Time Spent: 10m Work Description: omarismail94 commented on a change in pull request #11754: URL: https://github.com/apache/beam/pull/11754#discussion_r427677681 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ## @@ -66,38 +68,47 @@ public static void main(String[] args) { inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); // print the output record of case 1; -outputStream.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// PCOLLECTION: [3, row, 3.0] -// PCOLLECTION: [2, row, 2.0] -System.out.println("PCOLLECTION: " + input.getValues()); -return input; - } -})); +outputStream +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// PCOLLECTION: [3, row, 3.0] +// PCOLLECTION: [2, row, 2.0] +System.out.println("PCOLLECTION: " + input.getValues()); +return input; + } +})) +.setCoder(RowCoder.of(type)); Review comment: I can do that. I did `setRowSchema(type)` and it worked! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435281) Time Spent: 0.5h (was: 20m) > BeamSqlExample.java fails to build when running ./gradlew command > - > > Key: BEAM-10037 > URL: https://issues.apache.org/jira/browse/BEAM-10037 > Project: Beam > Issue Type: Improvement > Components: dsl-sql >Reporter: Omar Ismail >Assignee: Omar Ismail >Priority: P3 > Time Spent: 0.5h > Remaining Estimate: 0h > > In the `BeamSqlExample.java` class, the instructions state that to run the > example, use: > `./gradlew :sdks:java:extensions:sql:runBasicExample`. > I tried this and the build failed due to `java.lang.IllegalStateException: > Unable to return a default Coder` > > I will try to fix this! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-10038) Add script to mass-comment Jenkins triggers on PR
[ https://issues.apache.org/jira/browse/BEAM-10038?focusedWorklogId=435280&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435280 ] ASF GitHub Bot logged work on BEAM-10038: - Author: ASF GitHub Bot Created on: 20/May/20 00:29 Start Date: 20/May/20 00:29 Worklog Time Spent: 10m Work Description: ibzib opened a new pull request #11755: URL: https://github.com/apache/beam/pull/11755 @Ardagan wrote most of this script a while back, I just generalized it a bit. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Stat
[jira] [Work logged] (BEAM-10037) BeamSqlExample.java fails to build when running ./gradlew command
[ https://issues.apache.org/jira/browse/BEAM-10037?focusedWorklogId=435279&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435279 ] ASF GitHub Bot logged work on BEAM-10037: - Author: ASF GitHub Bot Created on: 20/May/20 00:29 Start Date: 20/May/20 00:29 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on a change in pull request #11754: URL: https://github.com/apache/beam/pull/11754#discussion_r427674472 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ## @@ -66,38 +68,47 @@ public static void main(String[] args) { inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); // print the output record of case 1; -outputStream.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// PCOLLECTION: [3, row, 3.0] -// PCOLLECTION: [2, row, 2.0] -System.out.println("PCOLLECTION: " + input.getValues()); -return input; - } -})); +outputStream +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// PCOLLECTION: [3, row, 3.0] +// PCOLLECTION: [2, row, 2.0] +System.out.println("PCOLLECTION: " + input.getValues()); +return input; + } +})) +.setCoder(RowCoder.of(type)); // Case 2. run the query with SqlTransform.query over result PCollection of case 1. PCollection outputStream2 = PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream) .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2")); // print the output record of case 2; -outputStream2.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// CASE1_RESULT: [row, 5.0] -System.out.println("CASE1_RESULT: " + input.getValues()); -return input; - } -})); +outputStream2 +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// CASE1_RESULT: [row, 5.0] +System.out.println("CASE1_RESULT: " + input.getValues()); +return input; + } +})) +.setCoder( +RowCoder.of( +Schema.builder() +.addStringField("stringField") +.addDoubleField("doubleField") +.build())); Review comment: Here as well ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ## @@ -66,38 +68,47 @@ public static void main(String[] args) { inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); // print the output record of case 1; -outputStream.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// PCOLLECTION: [3, row, 3.0] -// PCOLLECTION: [2, row, 2.0] -System.out.println("PCOLLECTION: " + input.getValues()); -return input; - } -})); +outputStream +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// PCOLLECTION: [3, row, 3.0] +// PCOLLECTION: [2, row, 2.0] +System.out.println("PCOLLECTION: " + input.getValues()); +return input; + } +})) +.setCoder(RowCoder.of(type)); Review comment: could you change this to `withRowSchema(type)`? It does the same thing, but it's less verbose This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use th
[jira] [Work logged] (BEAM-10037) BeamSqlExample.java fails to build when running ./gradlew command
[ https://issues.apache.org/jira/browse/BEAM-10037?focusedWorklogId=435278&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435278 ] ASF GitHub Bot logged work on BEAM-10037: - Author: ASF GitHub Bot Created on: 20/May/20 00:14 Start Date: 20/May/20 00:14 Worklog Time Spent: 10m Work Description: omarismail94 opened a new pull request #11754: URL: https://github.com/apache/beam/pull/11754 R: @TheNeuralBit In the `BeamSqlExample.java` class, the instructions state that to run the example, use: `./gradlew :sdks:java:extensions:sql:runBasicExample`. I tried this and the build failed due to `java.lang.IllegalStateException: Unable to return a default Coder` I fixed this by setting the Coder for both anon transforms. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://buil
[jira] [Work logged] (BEAM-9603) Support Dynamic Timer in Java SDK over FnApi
[ https://issues.apache.org/jira/browse/BEAM-9603?focusedWorklogId=435277&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435277 ] ASF GitHub Bot logged work on BEAM-9603: Author: ASF GitHub Bot Created on: 20/May/20 00:13 Start Date: 20/May/20 00:13 Worklog Time Spent: 10m Work Description: y1chi commented on pull request #11753: URL: https://github.com/apache/beam/pull/11753#issuecomment-631157957 Sorry, I should have marked PR as draft as I'm still testing it. Expecting a couple more minor fixes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435277) Time Spent: 1h 10m (was: 1h) > Support Dynamic Timer in Java SDK over FnApi > > > Key: BEAM-9603 > URL: https://issues.apache.org/jira/browse/BEAM-9603 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Boyuan Zhang >Priority: P2 > Time Spent: 1h 10m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10038) Add script to mass-comment Jenkins triggers on PR
[ https://issues.apache.org/jira/browse/BEAM-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth Knowles updated BEAM-10038: --- Status: Open (was: Triage Needed) > Add script to mass-comment Jenkins triggers on PR > - > > Key: BEAM-10038 > URL: https://issues.apache.org/jira/browse/BEAM-10038 > Project: Beam > Issue Type: Improvement > Components: build-system >Reporter: Kyle Weaver >Assignee: Kyle Weaver >Priority: P2 > > This is a work in progress, it just needs to be touched up and added to the > Beam repo: > https://gist.github.com/Ardagan/13e6031e8d1c9ebbd3029bf365c1a517 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-10038) Add script to mass-comment Jenkins triggers on PR
Kyle Weaver created BEAM-10038: -- Summary: Add script to mass-comment Jenkins triggers on PR Key: BEAM-10038 URL: https://issues.apache.org/jira/browse/BEAM-10038 Project: Beam Issue Type: Improvement Components: build-system Reporter: Kyle Weaver Assignee: Kyle Weaver This is a work in progress, it just needs to be touched up and added to the Beam repo: https://gist.github.com/Ardagan/13e6031e8d1c9ebbd3029bf365c1a517 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9603) Support Dynamic Timer in Java SDK over FnApi
[ https://issues.apache.org/jira/browse/BEAM-9603?focusedWorklogId=435276&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435276 ] ASF GitHub Bot logged work on BEAM-9603: Author: ASF GitHub Bot Created on: 20/May/20 00:04 Start Date: 20/May/20 00:04 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11753: URL: https://github.com/apache/beam/pull/11753#issuecomment-631153847 Run Java Flink PortableValidatesRunner Batch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435276) Time Spent: 1h (was: 50m) > Support Dynamic Timer in Java SDK over FnApi > > > Key: BEAM-9603 > URL: https://issues.apache.org/jira/browse/BEAM-9603 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Boyuan Zhang >Priority: P2 > Time Spent: 1h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (BEAM-10037) BeamSqlExample.java fails to build when running ./gradlew command
Omar Ismail created BEAM-10037: -- Summary: BeamSqlExample.java fails to build when running ./gradlew command Key: BEAM-10037 URL: https://issues.apache.org/jira/browse/BEAM-10037 Project: Beam Issue Type: Improvement Components: dsl-sql Reporter: Omar Ismail Assignee: Omar Ismail In the `BeamSqlExample.java` class, the instructions state that to run the example, use: `./gradlew :sdks:java:extensions:sql:runBasicExample`. I tried this and the build failed due to `java.lang.IllegalStateException: Unable to return a default Coder` I will try to fix this! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (BEAM-10037) BeamSqlExample.java fails to build when running ./gradlew command
[ https://issues.apache.org/jira/browse/BEAM-10037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth Knowles updated BEAM-10037: --- Status: Open (was: Triage Needed) > BeamSqlExample.java fails to build when running ./gradlew command > - > > Key: BEAM-10037 > URL: https://issues.apache.org/jira/browse/BEAM-10037 > Project: Beam > Issue Type: Improvement > Components: dsl-sql >Reporter: Omar Ismail >Assignee: Omar Ismail >Priority: P3 > > In the `BeamSqlExample.java` class, the instructions state that to run the > example, use: > `./gradlew :sdks:java:extensions:sql:runBasicExample`. > I tried this and the build failed due to `java.lang.IllegalStateException: > Unable to return a default Coder` > > I will try to fix this! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9899) Add integration tests for cross-language SnowflakeIO Read and Write
[ https://issues.apache.org/jira/browse/BEAM-9899?focusedWorklogId=435270&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435270 ] ASF GitHub Bot logged work on BEAM-9899: Author: ASF GitHub Bot Created on: 19/May/20 23:25 Start Date: 19/May/20 23:25 Worklog Time Spent: 10m Work Description: TheNeuralBit commented on a change in pull request #11701: URL: https://github.com/apache/beam/pull/11701#discussion_r427655332 ## File path: sdks/python/apache_beam/coders/row_coder.py ## @@ -134,19 +134,18 @@ def __init__(self, schema, components): def encode_to_stream(self, value, out, nested): nvals = len(self.schema.fields) self.SIZE_CODER.encode_to_stream(nvals, out, True) -attrs = [getattr(value, f.name) for f in self.schema.fields] Review comment: Yeah that's right. Right now it should only be possible to get here with a NamedTuple instance, so it should be safe. Looking forward to the day where more types might go through this code.. I kind of like the idea of using `tuple` as the "base" schema type in Python (i.e. the type we must be able to convert to/from for use in row coder). Relying on attributes isn't great since it limits us to field names that are valid python identifiers. All that being said I'd be fine dropping this part of the change for now. Renaming the special `id` field also fixes the bug. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435270) Time Spent: 40m (was: 0.5h) > Add integration tests for cross-language SnowflakeIO Read and Write > --- > > Key: BEAM-9899 > URL: https://issues.apache.org/jira/browse/BEAM-9899 > Project: Beam > Issue Type: Test > Components: io-ideas >Reporter: Dariusz Aniszewski >Priority: P2 > Time Spent: 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
[ https://issues.apache.org/jira/browse/BEAM-9856?focusedWorklogId=435268&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435268 ] ASF GitHub Bot logged work on BEAM-9856: Author: ASF GitHub Bot Created on: 19/May/20 23:24 Start Date: 19/May/20 23:24 Worklog Time Spent: 10m Work Description: jaketf commented on a change in pull request #11596: URL: https://github.com/apache/beam/pull/11596#discussion_r427654986 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ## @@ -472,24 +551,118 @@ public void initClient() throws IOException { this.client = new HttpHealthcareApiClient(); } +@GetInitialRestriction +public OffsetRange getEarliestToLatestRestriction(@Element String hl7v2Store) +throws IOException { + from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter.get()); + // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be + // included in results set to add an extra ms to the upper bound. + to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter.get()).plus(1); + return new OffsetRange(from.getMillis(), to.getMillis()); +} + +@NewTracker +public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) { + return timeRange.newTracker(); +} + +@SplitRestriction +public void split(@Restriction OffsetRange timeRange, OutputReceiver out) { + List splits = + timeRange.split(initialSplitDuration.getMillis(), DEFAULT_MIN_SPLIT_DURATION.getMillis()); + Instant from = Instant.ofEpochMilli(timeRange.getFrom()); + Instant to = Instant.ofEpochMilli(timeRange.getTo()); + Duration totalDuration = new Duration(from, to); + LOG.info( + String.format( + "splitting initial sendTime restriction of [minSendTime, now): [%s,%s), " + + "or [%s, %s). \n" + + "total days: %s \n" + + "into %s splits. \n" + + "Last split: %s", + from, + to, + timeRange.getFrom(), + timeRange.getTo(), + totalDuration.getStandardDays(), + splits.size(), + splits.get(splits.size() - 1).toString())); + + for (OffsetRange s : splits) { +out.output(s); + } +} + /** * List messages. * - * @param context the context + * @param hl7v2Store the HL7v2 store to list messages from * @throws IOException the io exception */ @ProcessElement -public void listMessages(ProcessContext context) throws IOException { - String hl7v2Store = context.element(); - // Output all elements of all pages. +public void listMessages( +@Element String hl7v2Store, +RestrictionTracker tracker, +OutputReceiver outputReceiver) +throws IOException { + OffsetRange currentRestriction = (OffsetRange) tracker.currentRestriction(); + Instant startRestriction = Instant.ofEpochMilli(currentRestriction.getFrom()); + Instant endRestriction = Instant.ofEpochMilli(currentRestriction.getTo()); HttpHealthcareApiClient.HL7v2MessagePages pages = - new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, this.filter); + new HttpHealthcareApiClient.HL7v2MessagePages( + client, hl7v2Store, startRestriction, endRestriction, filter.get(), "sendTime"); long reqestTime = Instant.now().getMillis(); - for (Stream page : pages) { + long lastClaimedMilliSecond; + Instant cursor; + boolean hangingClaim = false; // flag if the claimed ms spans spills over to the next page. + for (List page : pages) { // loop over pages. +int i = 0; +HL7v2Message msg = page.get(i); +while (i < page.size()) { // loop over messages in page + cursor = Instant.parse(msg.getSendTime()); + lastClaimedMilliSecond = cursor.getMillis(); + LOG.info( + String.format( + "initial claim for page %s lastClaimedMilliSecond = %s", + i, lastClaimedMilliSecond)); + if (hangingClaim || tracker.tryClaim(lastClaimedMilliSecond)) { +// This means we have claimed an entire millisecond we need to make sure that we +// process all messages for this millisecond because sendTime is allegedly nano second +// resolution. +// https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages#Message +while (cursor.getMillis() == lastClaimedMilliSecond +&& i < page.size()) { // loop over messages in millisecond. + outputReceiver.output(msg);
[jira] [Work logged] (BEAM-9977) Build Kafka Read on top of Java SplittableDoFn
[ https://issues.apache.org/jira/browse/BEAM-9977?focusedWorklogId=435265&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435265 ] ASF GitHub Bot logged work on BEAM-9977: Author: ASF GitHub Bot Created on: 19/May/20 23:21 Start Date: 19/May/20 23:21 Worklog Time Spent: 10m Work Description: boyuanzz commented on a change in pull request #11715: URL: https://github.com/apache/beam/pull/11715#discussion_r427654115 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.splittabledofn; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.io.range.OffsetRange; + +/** + * An {@link OffsetRangeTracker} for tracking a growable offset range. {@code Long.MAX_VALUE} is + * used as end range to indicate the possibility of infinity. + * + * An offset range is considered growable when the end offset could grow (or change) during + * execution time (e.g., Kafka topic partition offset, appended file, ...). + * + * The growable range is marked as done by claiming {@code Long.MAX_VALUE}. + */ +@Experimental(Kind.SPLITTABLE_DO_FN) +public class GrowableOffsetRangeTracker extends OffsetRangeTracker { + /** + * An interface that should be implemented to fetch estimated end offset of the range. + * + * {@code estimateRangeEnd} is called to give the end offset when {@code trySplit} or {@code + * getProgress} is invoked. The end offset is exclusive for the range. The estimated end is not + * necessary to increase monotonically as it will only be taken into computation when the estimate + * end is larger than the current position. When returning {@code Long.MAX_VALUE} as estimate, it + * means the largest possible position for the range is {@code Long.MAX_VALUE - 1}. If there is + * not an estimate yet, {@code Long.MIN_VALUE} should be returned, where estimated end will not + * effect progress and split. + * + * Having a good estimate is important for providing a good signal of progress and splitting at + * a proper position. + * + * If {@code estimate()} is expensive to call, please consider wrapping the implementation with + * {@code Suppliers.memoizeWithExpiration} as an optimization. + */ + @FunctionalInterface + public interface RangeEndEstimator { +long estimate(); + } + + private final RangeEndEstimator rangeEndEstimator; + + public GrowableOffsetRangeTracker(long start, RangeEndEstimator rangeEndEstimator) { +super(new OffsetRange(start, Long.MAX_VALUE)); +this.rangeEndEstimator = checkNotNull(rangeEndEstimator); + } + + @Override + public SplitResult trySplit(double fractionOfRemainder) { +// If current tracking range is no longer growable, split it as a normal range. +if (range.getTo() != Long.MAX_VALUE || range.getTo() == range.getFrom()) { + return super.trySplit(fractionOfRemainder); +} +// If current range has been done, there is no more space to split. +if (lastAttemptedOffset != null && lastAttemptedOffset == Long.MAX_VALUE) { + return null; +} +double cur = Review comment: Using `BigDecimal` in the latest revision. Thanks for your help! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435265) Time Spent: 2h 50m (was: 2h 40m) > Build Kafka Read on top of Java SplittableDoFn > -- > > Key: BEAM-9977 > URL: https://i
[jira] [Work logged] (BEAM-9603) Support Dynamic Timer in Java SDK over FnApi
[ https://issues.apache.org/jira/browse/BEAM-9603?focusedWorklogId=435264&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435264 ] ASF GitHub Bot logged work on BEAM-9603: Author: ASF GitHub Bot Created on: 19/May/20 23:17 Start Date: 19/May/20 23:17 Worklog Time Spent: 10m Work Description: boyuanzz commented on pull request #11753: URL: https://github.com/apache/beam/pull/11753#issuecomment-631136771 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435264) Time Spent: 50m (was: 40m) > Support Dynamic Timer in Java SDK over FnApi > > > Key: BEAM-9603 > URL: https://issues.apache.org/jira/browse/BEAM-9603 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Boyuan Zhang >Priority: P2 > Time Spent: 50m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9603) Support Dynamic Timer in Java SDK over FnApi
[ https://issues.apache.org/jira/browse/BEAM-9603?focusedWorklogId=435261&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435261 ] ASF GitHub Bot logged work on BEAM-9603: Author: ASF GitHub Bot Created on: 19/May/20 23:06 Start Date: 19/May/20 23:06 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11753: URL: https://github.com/apache/beam/pull/11753#issuecomment-631133371 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435261) Time Spent: 40m (was: 0.5h) > Support Dynamic Timer in Java SDK over FnApi > > > Key: BEAM-9603 > URL: https://issues.apache.org/jira/browse/BEAM-9603 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Boyuan Zhang >Priority: P2 > Time Spent: 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8889) Make GcsUtil use GoogleCloudStorage
[ https://issues.apache.org/jira/browse/BEAM-8889?focusedWorklogId=435258&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435258 ] ASF GitHub Bot logged work on BEAM-8889: Author: ASF GitHub Bot Created on: 19/May/20 23:03 Start Date: 19/May/20 23:03 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #11651: URL: https://github.com/apache/beam/pull/11651#issuecomment-631132405 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435258) Remaining Estimate: 134.5h (was: 134h 40m) Time Spent: 33.5h (was: 33h 20m) > Make GcsUtil use GoogleCloudStorage > --- > > Key: BEAM-8889 > URL: https://issues.apache.org/jira/browse/BEAM-8889 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Affects Versions: 2.16.0 >Reporter: Esun Kim >Assignee: VASU NORI >Priority: P2 > Labels: gcs > Original Estimate: 168h > Time Spent: 33.5h > Remaining Estimate: 134.5h > > [GcsUtil|https://github.com/apache/beam/blob/master/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java] > is a primary class to access Google Cloud Storage on Apache Beam. Current > implementation directly creates GoogleCloudStorageReadChannel and > GoogleCloudStorageWriteChannel by itself to read and write GCS data rather > than using > [GoogleCloudStorage|https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorage.java] > which is an abstract class providing basic IO capability which eventually > creates channel objects. This request is about updating GcsUtil to use > GoogleCloudStorage to create read and write channel, which is expected > flexible because it can easily pick up the new change; e.g. new channel > implementation using new protocol without code change. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9899) Add integration tests for cross-language SnowflakeIO Read and Write
[ https://issues.apache.org/jira/browse/BEAM-9899?focusedWorklogId=435257&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435257 ] ASF GitHub Bot logged work on BEAM-9899: Author: ASF GitHub Bot Created on: 19/May/20 23:02 Start Date: 19/May/20 23:02 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #11701: URL: https://github.com/apache/beam/pull/11701#discussion_r427648180 ## File path: sdks/python/apache_beam/coders/row_coder.py ## @@ -134,19 +134,18 @@ def __init__(self, schema, components): def encode_to_stream(self, value, out, nested): nvals = len(self.schema.fields) self.SIZE_CODER.encode_to_stream(nvals, out, True) -attrs = [getattr(value, f.name) for f in self.schema.fields] Review comment: This forces the value to be an iterable, rather than just having the right fields, right? Are we sure we want to do that? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435257) Time Spent: 0.5h (was: 20m) > Add integration tests for cross-language SnowflakeIO Read and Write > --- > > Key: BEAM-9899 > URL: https://issues.apache.org/jira/browse/BEAM-9899 > Project: Beam > Issue Type: Test > Components: io-ideas >Reporter: Dariusz Aniszewski >Priority: P2 > Time Spent: 0.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8019) Support cross-language transforms for DataflowRunner
[ https://issues.apache.org/jira/browse/BEAM-8019?focusedWorklogId=435252&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435252 ] ASF GitHub Bot logged work on BEAM-8019: Author: ASF GitHub Bot Created on: 19/May/20 22:58 Start Date: 19/May/20 22:58 Worklog Time Spent: 10m Work Description: robertwb commented on a change in pull request #11740: URL: https://github.com/apache/beam/pull/11740#discussion_r427646588 ## File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py ## @@ -310,15 +312,15 @@ def __init__( environment_payload = proto_utils.parse_Bytes( environment.payload, beam_runner_api_pb2.DockerPayload) container_image_url = environment_payload.container_image -if container_image_url == pipeline_sdk_container_image: - # This was already added +if container_image_url in already_added_containers: + # Do not add the pipeline environment again. Review comment: Perhaps also add a comment that currently dataflow stages all dependencies to all environments? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435252) Time Spent: 19h 50m (was: 19h 40m) > Support cross-language transforms for DataflowRunner > > > Key: BEAM-8019 > URL: https://issues.apache.org/jira/browse/BEAM-8019 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core >Reporter: Chamikara Madhusanka Jayalath >Assignee: Chamikara Madhusanka Jayalath >Priority: P1 > Time Spent: 19h 50m > Remaining Estimate: 0h > > This is to capture the Beam changes needed for this task. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-10007) PortableRunner doesn't handle ValueProvider instances when converting pipeline options
[ https://issues.apache.org/jira/browse/BEAM-10007?focusedWorklogId=435251&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435251 ] ASF GitHub Bot logged work on BEAM-10007: - Author: ASF GitHub Bot Created on: 19/May/20 22:55 Start Date: 19/May/20 22:55 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #11744: URL: https://github.com/apache/beam/pull/11744#discussion_r427645484 ## File path: sdks/python/apache_beam/runners/portability/portable_runner.py ## @@ -164,10 +165,19 @@ def add_runner_options(parser): all_options = self.options.get_all_options( add_extra_args_fn=add_runner_options, retain_unknown_options=self._retain_unknown_options) + # TODO: Define URNs for options. # convert int values: https://issues.apache.org/jira/browse/BEAM-5509 +def convert_pipeline_option_value(v): + if type(v) == int: Review comment: Interesting. For this PR, could you move that comment closer to the if type(v) == int line. > I can look into fixing that upstream so we can get rid of the special case here. If you have time, maybe file a bug. I do not think this is a very high priority for us. > I don't think it would be good to call str(v) on everything I agree. I was trying to understand why are we doing this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435251) Time Spent: 1h (was: 50m) > PortableRunner doesn't handle ValueProvider instances when converting > pipeline options > -- > > Key: BEAM-10007 > URL: https://issues.apache.org/jira/browse/BEAM-10007 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Brian Hulette >Priority: P2 > Time Spent: 1h > Remaining Estimate: 0h > > We attempt to convert ValueProvider instances directly to JSON with > json_format, leading to errors like the one described in BEAM-9975. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-10007) PortableRunner doesn't handle ValueProvider instances when converting pipeline options
[ https://issues.apache.org/jira/browse/BEAM-10007?focusedWorklogId=435250&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435250 ] ASF GitHub Bot logged work on BEAM-10007: - Author: ASF GitHub Bot Created on: 19/May/20 22:54 Start Date: 19/May/20 22:54 Worklog Time Spent: 10m Work Description: aaltay commented on a change in pull request #11744: URL: https://github.com/apache/beam/pull/11744#discussion_r427645484 ## File path: sdks/python/apache_beam/runners/portability/portable_runner.py ## @@ -164,10 +165,19 @@ def add_runner_options(parser): all_options = self.options.get_all_options( add_extra_args_fn=add_runner_options, retain_unknown_options=self._retain_unknown_options) + # TODO: Define URNs for options. # convert int values: https://issues.apache.org/jira/browse/BEAM-5509 +def convert_pipeline_option_value(v): + if type(v) == int: Review comment: Interesting. For this PR, could you move that comment closer to the if type(v) == int line. > I can look into fixing that upstream so we can get rid of the special case here. If you have time, maybe file a bug. I do not think this is a very high priority for us. > I don't think it would be good to call str(v) on everything I agree. I was trying to understand why are we doing this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435250) Time Spent: 50m (was: 40m) > PortableRunner doesn't handle ValueProvider instances when converting > pipeline options > -- > > Key: BEAM-10007 > URL: https://issues.apache.org/jira/browse/BEAM-10007 > Project: Beam > Issue Type: Bug > Components: sdk-py-core >Reporter: Brian Hulette >Priority: P2 > Time Spent: 50m > Remaining Estimate: 0h > > We attempt to convert ValueProvider instances directly to JSON with > json_format, leading to errors like the one described in BEAM-9975. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
[ https://issues.apache.org/jira/browse/BEAM-9856?focusedWorklogId=435247&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435247 ] ASF GitHub Bot logged work on BEAM-9856: Author: ASF GitHub Bot Created on: 19/May/20 22:50 Start Date: 19/May/20 22:50 Worklog Time Spent: 10m Work Description: jaketf commented on a change in pull request #11596: URL: https://github.com/apache/beam/pull/11596#discussion_r427643888 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * This transform is optimized for dynamic splitting of message.list calls for large batches of + * historical data and assumes rather continuous stream of sendTimes. It will dynamically + * rebalance resources to handle "peak traffic times" but will waste resources if there are large + * durations (days) of the sendTime dimension without data. Review comment: note to self: remove reference to "dynamically rebalance" as this is not yet supported by dataflow runner. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435247) Time Spent: 7h 50m (was: 7h 40m) > HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization > -- > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: P3 > Time Spent: 7h 50m > Remaining Estimate: 0h > > Currently the List Messages API paginates through in a single ProcessElement > Call. > However we could get a restriction based on createTime using Messages.List > filter and orderby. > > This is inline with the future roadmap of HL7v2 bulk export API becomes > available that should allow splitting on (e.g. create time dimension). > Leveraging this bulk export might be a future optimization to explore. > > This could take one of two forms: > 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make > optimization the runner's problem, potentially unnecessarily complex for this > use case ) > 2. static splitting on some time partition e.g. finding the earliest > createTime and emitting a PCollection of 1 hour partitions and paginating > through each hour of data w/ in the time frame that the store spans, in a > separate ProcessElement. (easy to implement but will likely have hot keys / > stragglers based on "busy hours") > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9723) [Java] PTransform that connects to Cloud DLP deidentification service
[ https://issues.apache.org/jira/browse/BEAM-9723?focusedWorklogId=435246&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435246 ] ASF GitHub Bot logged work on BEAM-9723: Author: ASF GitHub Bot Created on: 19/May/20 22:49 Start Date: 19/May/20 22:49 Worklog Time Spent: 10m Work Description: tysonjh commented on a change in pull request #11566: URL: https://github.com/apache/beam/pull/11566#discussion_r427403454 ## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java ## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.auto.value.AutoValue; +import com.google.cloud.dlp.v2.DlpServiceClient; +import com.google.privacy.dlp.v2.ContentItem; +import com.google.privacy.dlp.v2.DeidentifyConfig; +import com.google.privacy.dlp.v2.DeidentifyContentRequest; +import com.google.privacy.dlp.v2.DeidentifyContentResponse; +import com.google.privacy.dlp.v2.FieldId; +import com.google.privacy.dlp.v2.InspectConfig; +import com.google.privacy.dlp.v2.ProjectName; +import com.google.privacy.dlp.v2.Table; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * A {@link PTransform} connecting to Cloud DLP and deidentifying text according to provided + * settings. The transform supports both CSV formatted input data and unstructured input. + * + * If the csvHeader property is set, csvDelimiter also should be, else the results will be + * incorrect. If csvHeader is not set, input is assumed to be unstructured. + * + * Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set. The + * situation is the same with deidentifyTemplateName and deidentifyConfig ({@link DeidentifyConfig}. + * + * Batch size defines how big are batches sent to DLP at once in bytes. Review comment: Are you agreeing that the comments should be moved to the methods, or that the comments are also useful here for the template inspection (I'm unfamiliar with what the 'inspect contents' and 'inspect template' actions)? ## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java ## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.auto.value.AutoValue; +import com.google.cloud.dlp.v2.DlpServiceClient; +import com.google.privacy.dlp.v2.ContentItem; +import com.google.privacy.dlp.v2.DeidentifyConfig; +import com.google.privacy.dlp.v2.DeidentifyContentRequest; +import com.google.privacy.dlp.v2.DeidentifyContentResponse; +import com.google.privacy.dlp.v2.FieldId; +import com.google.privacy.dlp.v2.InspectConfig; +import com.google.privacy.dlp.v2.ProjectName; +import com.google.privacy.dlp.v2.Table; +import java.io.IOException; +import java.util
[jira] [Work logged] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
[ https://issues.apache.org/jira/browse/BEAM-9856?focusedWorklogId=435245&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435245 ] ASF GitHub Bot logged work on BEAM-9856: Author: ASF GitHub Bot Created on: 19/May/20 22:48 Start Date: 19/May/20 22:48 Worklog Time Spent: 10m Work Description: jaketf commented on a change in pull request #11596: URL: https://github.com/apache/beam/pull/11596#discussion_r427643387 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * This transform is optimized for dynamic splitting of message.list calls for large batches of Review comment: ```suggestion * This transform is optimized for splitting of message.list calls for large batches of ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435245) Time Spent: 7h 40m (was: 7.5h) > HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization > -- > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: P3 > Time Spent: 7h 40m > Remaining Estimate: 0h > > Currently the List Messages API paginates through in a single ProcessElement > Call. > However we could get a restriction based on createTime using Messages.List > filter and orderby. > > This is inline with the future roadmap of HL7v2 bulk export API becomes > available that should allow splitting on (e.g. create time dimension). > Leveraging this bulk export might be a future optimization to explore. > > This could take one of two forms: > 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make > optimization the runner's problem, potentially unnecessarily complex for this > use case ) > 2. static splitting on some time partition e.g. finding the earliest > createTime and emitting a PCollection of 1 hour partitions and paginating > through each hour of data w/ in the time frame that the store spans, in a > separate ProcessElement. (easy to implement but will likely have hot keys / > stragglers based on "busy hours") > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9603) Support Dynamic Timer in Java SDK over FnApi
[ https://issues.apache.org/jira/browse/BEAM-9603?focusedWorklogId=435244&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435244 ] ASF GitHub Bot logged work on BEAM-9603: Author: ASF GitHub Bot Created on: 19/May/20 22:46 Start Date: 19/May/20 22:46 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11753: URL: https://github.com/apache/beam/pull/11753#issuecomment-631126898 R: @boyuanzz This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435244) Time Spent: 0.5h (was: 20m) > Support Dynamic Timer in Java SDK over FnApi > > > Key: BEAM-9603 > URL: https://issues.apache.org/jira/browse/BEAM-9603 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Boyuan Zhang >Priority: P2 > Time Spent: 0.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9603) Support Dynamic Timer in Java SDK over FnApi
[ https://issues.apache.org/jira/browse/BEAM-9603?focusedWorklogId=435243&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435243 ] ASF GitHub Bot logged work on BEAM-9603: Author: ASF GitHub Bot Created on: 19/May/20 22:46 Start Date: 19/May/20 22:46 Worklog Time Spent: 10m Work Description: lukecwik commented on pull request #11753: URL: https://github.com/apache/beam/pull/11753#issuecomment-631126805 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435243) Time Spent: 20m (was: 10m) > Support Dynamic Timer in Java SDK over FnApi > > > Key: BEAM-9603 > URL: https://issues.apache.org/jira/browse/BEAM-9603 > Project: Beam > Issue Type: New Feature > Components: sdk-java-harness >Reporter: Boyuan Zhang >Priority: P2 > Time Spent: 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
[ https://issues.apache.org/jira/browse/BEAM-9856?focusedWorklogId=435240&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435240 ] ASF GitHub Bot logged work on BEAM-9856: Author: ASF GitHub Bot Created on: 19/May/20 22:42 Start Date: 19/May/20 22:42 Worklog Time Spent: 10m Work Description: jaketf commented on a change in pull request #11596: URL: https://github.com/apache/beam/pull/11596#discussion_r427641350 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * This transform is optimized for dynamic splitting of message.list calls for large batches of + * historical data and assumes rather continuous stream of sendTimes. It will dynamically + * rebalance resources to handle "peak traffic times" but will waste resources if there are large + * durations (days) of the sendTime dimension without data. + * + * Implementation includes overhead for: 1. two api calls to determine the min/max sendTime of + * the HL7v2 store at invocation time. 2. initial splitting into non-overlapping time ranges + * (default daily) to achieve parallelization in separate messages.list calls. + * + * This will make more queries than necessary when used with very small data sets. (or very + * sparse data sets in the sendTime dimension). + * + * If you have large but sparse data (e.g. hours between consecutive message sendTimes) and + * know something about the time ranges where you have no data, consider using multiple instances + * of this transform specifying sendTime filters to omit the ranges where there is no data. Review comment: That's great to know! will remove this guidance as it will lead to unnecessary complexity. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435240) Time Spent: 7.5h (was: 7h 20m) > HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization > -- > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: P3 > Time Spent: 7.5h > Remaining Estimate: 0h > > Currently the List Messages API paginates through in a single ProcessElement > Call. > However we could get a restriction based on createTime using Messages.List > filter and orderby. > > This is inline with the future roadmap of HL7v2 bulk export API becomes > available that should allow splitting on (e.g. create time dimension). > Leveraging this bulk export might be a future optimization to explore. > > This could take one of two forms: > 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make > optimization the runner's problem, potentially unnecessarily complex for this > use case ) > 2. static splitting on some time partition e.g. finding the earliest > createTime and emitting a PCollection of 1 hour partitions and paginating > through each hour of data w/ in the time frame that the store spans, in a > separate ProcessElement. (easy to implement but will likely have hot keys / > stragglers based on "busy hours") > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
[ https://issues.apache.org/jira/browse/BEAM-9856?focusedWorklogId=435239&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435239 ] ASF GitHub Bot logged work on BEAM-9856: Author: ASF GitHub Bot Created on: 19/May/20 22:41 Start Date: 19/May/20 22:41 Worklog Time Spent: 10m Work Description: jaketf commented on a change in pull request #11596: URL: https://github.com/apache/beam/pull/11596#discussion_r427640873 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * This transform is optimized for dynamic splitting of message.list calls for large batches of + * historical data and assumes rather continuous stream of sendTimes. It will dynamically + * rebalance resources to handle "peak traffic times" but will waste resources if there are large + * durations (days) of the sendTime dimension without data. + * + * Implementation includes overhead for: 1. two api calls to determine the min/max sendTime of + * the HL7v2 store at invocation time. 2. initial splitting into non-overlapping time ranges + * (default daily) to achieve parallelization in separate messages.list calls. Review comment: I originally included this for users who may try to benchmark this against tiny / sparse results set and be surprised why it is slow / making so many api calls. I see your point will remove. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435239) Time Spent: 7h 20m (was: 7h 10m) > HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization > -- > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: P3 > Time Spent: 7h 20m > Remaining Estimate: 0h > > Currently the List Messages API paginates through in a single ProcessElement > Call. > However we could get a restriction based on createTime using Messages.List > filter and orderby. > > This is inline with the future roadmap of HL7v2 bulk export API becomes > available that should allow splitting on (e.g. create time dimension). > Leveraging this bulk export might be a future optimization to explore. > > This could take one of two forms: > 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make > optimization the runner's problem, potentially unnecessarily complex for this > use case ) > 2. static splitting on some time partition e.g. finding the earliest > createTime and emitting a PCollection of 1 hour partitions and paginating > through each hour of data w/ in the time frame that the store spans, in a > separate ProcessElement. (easy to implement but will likely have hot keys / > stragglers based on "busy hours") > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
[ https://issues.apache.org/jira/browse/BEAM-9856?focusedWorklogId=435236&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435236 ] ASF GitHub Bot logged work on BEAM-9856: Author: ASF GitHub Bot Created on: 19/May/20 22:39 Start Date: 19/May/20 22:39 Worklog Time Spent: 10m Work Description: jaketf commented on a change in pull request #11596: URL: https://github.com/apache/beam/pull/11596#discussion_r427640111 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * This transform is optimized for dynamic splitting of message.list calls for large batches of + * historical data and assumes rather continuous stream of sendTimes. It will dynamically + * rebalance resources to handle "peak traffic times" but will waste resources if there are large + * durations (days) of the sendTime dimension without data. Review comment: correct. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435236) Time Spent: 7h 10m (was: 7h) > HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization > -- > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: P3 > Time Spent: 7h 10m > Remaining Estimate: 0h > > Currently the List Messages API paginates through in a single ProcessElement > Call. > However we could get a restriction based on createTime using Messages.List > filter and orderby. > > This is inline with the future roadmap of HL7v2 bulk export API becomes > available that should allow splitting on (e.g. create time dimension). > Leveraging this bulk export might be a future optimization to explore. > > This could take one of two forms: > 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make > optimization the runner's problem, potentially unnecessarily complex for this > use case ) > 2. static splitting on some time partition e.g. finding the earliest > createTime and emitting a PCollection of 1 hour partitions and paginating > through each hour of data w/ in the time frame that the store spans, in a > separate ProcessElement. (easy to implement but will likely have hot keys / > stragglers based on "busy hours") > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
[ https://issues.apache.org/jira/browse/BEAM-9856?focusedWorklogId=435235&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435235 ] ASF GitHub Bot logged work on BEAM-9856: Author: ASF GitHub Bot Created on: 19/May/20 22:37 Start Date: 19/May/20 22:37 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #11596: URL: https://github.com/apache/beam/pull/11596#discussion_r427618071 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * This transform is optimized for dynamic splitting of message.list calls for large batches of Review comment: Consider using `{@code ...}` when referring to code and `{@link ...}` for things you can directly link against. ```suggestion * This transform is optimized for dynamic splitting of {@code message.list} calls for large batches of ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435235) Time Spent: 7h (was: 6h 50m) > HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization > -- > > Key: BEAM-9856 > URL: https://issues.apache.org/jira/browse/BEAM-9856 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Reporter: Jacob Ferriero >Assignee: Jacob Ferriero >Priority: P3 > Time Spent: 7h > Remaining Estimate: 0h > > Currently the List Messages API paginates through in a single ProcessElement > Call. > However we could get a restriction based on createTime using Messages.List > filter and orderby. > > This is inline with the future roadmap of HL7v2 bulk export API becomes > available that should allow splitting on (e.g. create time dimension). > Leveraging this bulk export might be a future optimization to explore. > > This could take one of two forms: > 1. dyanmically splitable via splitable DoFn (sexy, beam idiomatic: make > optimization the runner's problem, potentially unnecessarily complex for this > use case ) > 2. static splitting on some time partition e.g. finding the earliest > createTime and emitting a PCollection of 1 hour partitions and paginating > through each hour of data w/ in the time frame that the store spans, in a > separate ProcessElement. (easy to implement but will likely have hot keys / > stragglers based on "busy hours") > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9856) HL7v2IO.ListHL7v2Messages should be refactored to support more parallelization
[ https://issues.apache.org/jira/browse/BEAM-9856?focusedWorklogId=435231&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435231 ] ASF GitHub Bot logged work on BEAM-9856: Author: ASF GitHub Bot Created on: 19/May/20 22:30 Start Date: 19/May/20 22:30 Worklog Time Spent: 10m Work Description: lukecwik commented on a change in pull request #11596: URL: https://github.com/apache/beam/pull/11596#discussion_r427616856 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * This transform is optimized for dynamic splitting of message.list calls for large batches of + * historical data and assumes rather continuous stream of sendTimes. It will dynamically + * rebalance resources to handle "peak traffic times" but will waste resources if there are large + * durations (days) of the sendTime dimension without data. Review comment: wouldn' this just be a small amount of waste since we would effectively get an empty response? ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * This transform is optimized for dynamic splitting of message.list calls for large batches of + * historical data and assumes rather continuous stream of sendTimes. It will dynamically + * rebalance resources to handle "peak traffic times" but will waste resources if there are large + * durations (days) of the sendTime dimension without data. + * + * Implementation includes overhead for: 1. two api calls to determine the min/max sendTime of Review comment: consider using `` and `` tags in the javadoc for your ordered list ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * This transform is optimized for dynamic splitting of message.list calls for large batches of + * historical data and assumes rather continuous stream of sendTimes. It will dynamically + * rebalance resources to handle "peak traffic times" but will waste resources if there are large + * durations (days) of the sendTime dimension without data. + * + * Implementation includes overhead for: 1. two api calls to determine the min/max sendTime of + * the HL7v2 store at invocation time. 2. initial splitting into non-overlapping time ranges + * (default daily) to achieve parallelization in separate messages.list calls. + * + * This will make more queries than necessary when used with very small data sets. (or very + * sparse data sets in the sendTime dimension). + * + * If you have large but sparse data (e.g. hours between consecutive message sendTimes) and + * know something about the time ranges where you have no data, consider using multiple instances + * of this transform specifying sendTime filters to omit the ranges where there is no data. + */ public static class ListHL7v2Messages extends PTransform> { -private final List hl7v2Stores; -private final String filter; +private final ValueProvider> hl7v2Stores; +private ValueProvider filter; +private Duration initialSplitDuration; Review comment: even if a member variable is null, it should still be final since it doesn't look like we mutate it locally. Same reason for other places I suggest to change this. ```suggestion private final ValueProvider filter; private final Duration initialSplitDuration; ``` ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ## @@ -427,29 +454,75 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) * @param filter the filter */ ListHL7v2Messages(ValueProvider> hl7v2Stores, ValueProvider filter) { - this.hl7v2Stores = hl7v2Stores.get(); - this.filter = filter.get(); + this.hl7v2Stores = hl7v2Stores; + this.filter = filter; +} + +/** + * Instantiates a new List hl 7 v 2 messages. + * +
[jira] [Assigned] (BEAM-9515) ArrayScanToUncollectConverter Unnest does not support sub-queries
[ https://issues.apache.org/jira/browse/BEAM-9515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Pilloud reassigned BEAM-9515: Assignee: (was: Andrew Pilloud) > ArrayScanToUncollectConverter Unnest does not support sub-queries > - > > Key: BEAM-9515 > URL: https://issues.apache.org/jira/browse/BEAM-9515 > Project: Beam > Issue Type: New Feature > Components: dsl-sql-zetasql >Reporter: Andrew Pilloud >Priority: P2 > > {code:java} > Mar 16, 2020 1:00:02 PM > cloud.dataflow.sql.ExecuteQueryServiceServer$SqlComplianceServiceImpl > executeQuery > INFO: Processing Sql statement: SELECT * FROM UNNEST(ARRAY( > SELECT bool_val FROM AllTypesTable t > ORDER BY bool_val ASC > )) x WITH OFFSET POS > Mar 16, 2020 1:00:02 PM > com.google.zetasql.io.grpc.internal.SerializingExecutor run > SEVERE: Exception while executing runnable > com.google.zetasql.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@7b42f9e7 > java.lang.ClassCastException: > com.google.zetasql.resolvedast.ResolvedNodes$ResolvedSubqueryExpr cannot be > cast to com.google.zetasql.resolvedast.ResolvedNodes$ResolvedLiteral > at > org.apache.beam.sdk.extensions.sql.zetasql.translation.ArrayScanToUncollectConverter.convert(ArrayScanToUncollectConverter.java:45) > at > org.apache.beam.sdk.extensions.sql.zetasql.translation.ArrayScanToUncollectConverter.convert(ArrayScanToUncollectConverter.java:31) > at > org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convertNode(QueryStatementConverter.java:97) > at > java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) > at java.util.Collections$2.tryAdvance(Collections.java:4717) > at java.util.Collections$2.forEachRemaining(Collections.java:4725) > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) > at > java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) > at > java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) > at > org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convertNode(QueryStatementConverter.java:96) > at > org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convert(QueryStatementConverter.java:84) > at > org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convertRootQuery(QueryStatementConverter.java:51) > at > org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLPlannerImpl.rel(ZetaSQLPlannerImpl.java:160) > at > org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRelInternal(ZetaSQLQueryPlanner.java:131) > at > org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRel(ZetaSQLQueryPlanner.java:115) > at > cloud.dataflow.sql.ExecuteQueryServiceServer$SqlComplianceServiceImpl.executeQuery(ExecuteQueryServiceServer.java:242) > at > com.google.zetasql.testing.SqlComplianceServiceGrpc$MethodHandlers.invoke(SqlComplianceServiceGrpc.java:423) > at > com.google.zetasql.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171) > at > com.google.zetasql.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283) > at > com.google.zetasql.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:711) > at > com.google.zetasql.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at > com.google.zetasql.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8889) Make GcsUtil use GoogleCloudStorage
[ https://issues.apache.org/jira/browse/BEAM-8889?focusedWorklogId=435215&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435215 ] ASF GitHub Bot logged work on BEAM-8889: Author: ASF GitHub Bot Created on: 19/May/20 21:58 Start Date: 19/May/20 21:58 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #11651: URL: https://github.com/apache/beam/pull/11651#issuecomment-631105651 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435215) Remaining Estimate: 134h 40m (was: 134h 50m) Time Spent: 33h 20m (was: 33h 10m) > Make GcsUtil use GoogleCloudStorage > --- > > Key: BEAM-8889 > URL: https://issues.apache.org/jira/browse/BEAM-8889 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Affects Versions: 2.16.0 >Reporter: Esun Kim >Assignee: VASU NORI >Priority: P2 > Labels: gcs > Original Estimate: 168h > Time Spent: 33h 20m > Remaining Estimate: 134h 40m > > [GcsUtil|https://github.com/apache/beam/blob/master/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java] > is a primary class to access Google Cloud Storage on Apache Beam. Current > implementation directly creates GoogleCloudStorageReadChannel and > GoogleCloudStorageWriteChannel by itself to read and write GCS data rather > than using > [GoogleCloudStorage|https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorage.java] > which is an abstract class providing basic IO capability which eventually > creates channel objects. This request is about updating GcsUtil to use > GoogleCloudStorage to create read and write channel, which is expected > flexible because it can easily pick up the new change; e.g. new channel > implementation using new protocol without code change. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8889) Make GcsUtil use GoogleCloudStorage
[ https://issues.apache.org/jira/browse/BEAM-8889?focusedWorklogId=435214&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435214 ] ASF GitHub Bot logged work on BEAM-8889: Author: ASF GitHub Bot Created on: 19/May/20 21:58 Start Date: 19/May/20 21:58 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #11651: URL: https://github.com/apache/beam/pull/11651#issuecomment-631105532 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435214) Remaining Estimate: 134h 50m (was: 135h) Time Spent: 33h 10m (was: 33h) > Make GcsUtil use GoogleCloudStorage > --- > > Key: BEAM-8889 > URL: https://issues.apache.org/jira/browse/BEAM-8889 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Affects Versions: 2.16.0 >Reporter: Esun Kim >Assignee: VASU NORI >Priority: P2 > Labels: gcs > Original Estimate: 168h > Time Spent: 33h 10m > Remaining Estimate: 134h 50m > > [GcsUtil|https://github.com/apache/beam/blob/master/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java] > is a primary class to access Google Cloud Storage on Apache Beam. Current > implementation directly creates GoogleCloudStorageReadChannel and > GoogleCloudStorageWriteChannel by itself to read and write GCS data rather > than using > [GoogleCloudStorage|https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorage.java] > which is an abstract class providing basic IO capability which eventually > creates channel objects. This request is about updating GcsUtil to use > GoogleCloudStorage to create read and write channel, which is expected > flexible because it can easily pick up the new change; e.g. new channel > implementation using new protocol without code change. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-8889) Make GcsUtil use GoogleCloudStorage
[ https://issues.apache.org/jira/browse/BEAM-8889?focusedWorklogId=435213&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435213 ] ASF GitHub Bot logged work on BEAM-8889: Author: ASF GitHub Bot Created on: 19/May/20 21:56 Start Date: 19/May/20 21:56 Worklog Time Spent: 10m Work Description: veblush commented on a change in pull request #11651: URL: https://github.com/apache/beam/pull/11651#discussion_r427623777 ## File path: buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy ## @@ -489,6 +490,7 @@ class BeamModulePlugin implements Plugin { grpc_protobuf : "io.grpc:grpc-protobuf:$grpc_version", grpc_protobuf_lite : "io.grpc:grpc-protobuf-lite:$grpc_version", grpc_netty : "io.grpc:grpc-netty:$grpc_version", +grpc_netty_shaded : "io.grpc:grpc-netty-shaded:$grpc_version", Review comment: Note that current beam already has it from [gax-grpc](https://mvnrepository.com/artifact/com.google.api/gax-grpc/1.54.0) transitively. This can make sure that all these components are working with the same version. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking --- Worklog Id: (was: 435213) Remaining Estimate: 135h (was: 135h 10m) Time Spent: 33h (was: 32h 50m) > Make GcsUtil use GoogleCloudStorage > --- > > Key: BEAM-8889 > URL: https://issues.apache.org/jira/browse/BEAM-8889 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp >Affects Versions: 2.16.0 >Reporter: Esun Kim >Assignee: VASU NORI >Priority: P2 > Labels: gcs > Original Estimate: 168h > Time Spent: 33h > Remaining Estimate: 135h > > [GcsUtil|https://github.com/apache/beam/blob/master/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java] > is a primary class to access Google Cloud Storage on Apache Beam. Current > implementation directly creates GoogleCloudStorageReadChannel and > GoogleCloudStorageWriteChannel by itself to read and write GCS data rather > than using > [GoogleCloudStorage|https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorage.java] > which is an abstract class providing basic IO capability which eventually > creates channel objects. This request is about updating GcsUtil to use > GoogleCloudStorage to create read and write channel, which is expected > flexible because it can easily pick up the new change; e.g. new channel > implementation using new protocol without code change. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Work logged] (BEAM-9603) Support Dynamic Timer in Java SDK over FnApi
[ https://issues.apache.org/jira/browse/BEAM-9603?focusedWorklogId=435212&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435212 ] ASF GitHub Bot logged work on BEAM-9603: Author: ASF GitHub Bot Created on: 19/May/20 21:52 Start Date: 19/May/20 21:52 Worklog Time Spent: 10m Work Description: y1chi opened a new pull request #11753: URL: https://github.com/apache/beam/pull/11753 Implemented the missing pieces in FnApiDoFnRunner to support timer family. Also refactored a few function signatures to avoid confusion. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostComm
[jira] [Assigned] (BEAM-10015) output timestamp not properly propagated through the Dataflow runner
[ https://issues.apache.org/jira/browse/BEAM-10015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kyle Weaver reassigned BEAM-10015: -- Assignee: Kyle Weaver > output timestamp not properly propagated through the Dataflow runner > > > Key: BEAM-10015 > URL: https://issues.apache.org/jira/browse/BEAM-10015 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Reuven Lax >Assignee: Kyle Weaver >Priority: P1 > Fix For: 2.21.0 > > Time Spent: 40m > Remaining Estimate: 0h > > Dataflow runner does not propagate the output timestamp into timer firing, > resulting in incorrect default timestamps when outputting from a processTimer. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (BEAM-10015) output timestamp not properly propagated through the Dataflow runner
[ https://issues.apache.org/jira/browse/BEAM-10015?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kyle Weaver reassigned BEAM-10015: -- Assignee: Reuven Lax (was: Kyle Weaver) > output timestamp not properly propagated through the Dataflow runner > > > Key: BEAM-10015 > URL: https://issues.apache.org/jira/browse/BEAM-10015 > Project: Beam > Issue Type: Bug > Components: runner-dataflow >Reporter: Reuven Lax >Assignee: Reuven Lax >Priority: P1 > Fix For: 2.21.0 > > Time Spent: 40m > Remaining Estimate: 0h > > Dataflow runner does not propagate the output timestamp into timer firing, > resulting in incorrect default timestamps when outputting from a processTimer. -- This message was sent by Atlassian Jira (v8.3.4#803005)