[GitHub] [beam] dmvk merged pull request #11750: [BEAM-9900] Fix polling behavior in UnboundedSourceWrapper
dmvk merged pull request #11750: URL: https://github.com/apache/beam/pull/11750 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] dmvk commented on pull request #11750: [BEAM-9900] Fix polling behavior in UnboundedSourceWrapper
dmvk commented on pull request #11750: URL: https://github.com/apache/beam/pull/11750#issuecomment-631269828 Thanks for the fix. 👍 🎉 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] jaketf edited a comment on pull request #11339: [BEAM-9468] Fhir io
jaketf edited a comment on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-631265943 I think that something about `TestPubsubSignal` does not play well with the way this integration test suite runs on Dataflow. Following in suit with `PubsubReadIT` (which also uses `TestPububSignal`) `BigQueryIOReadIT` and `BigQueryIOStorageReadTableRowIT` I've added this to the exclude of this project's build file. This test can still be run (and passes) using DirectRunner and the evidence in the [above comment](https://github.com/apache/beam/pull/11339#issuecomment-631203002) points to this being a red herring of an issue with test pubsub signal and dataflow runner. Potentially related issue: [BEAM-6804](https://issues.apache.org/jira/browse/BEAM-6804?jql=text%20~%20%22TestPubsubSignal%22) I've filed an issue for this instance [BEAM-10040](https://issues.apache.org/jira/browse/BEAM-10040). @pabloem please let me know if this is acceptable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] jaketf commented on pull request #11339: [BEAM-9468] Fhir io
jaketf commented on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-631265943 I think that something about `TestPubsubSignal` does not play well with the way this integration test suite runs on Dataflow. Following in suit with `PubsubReadIT` (which also uses `TestPububSignal`) `BigQueryIOReadIT` and `BigQueryIOStorageReadTableRowIT` I've added this to the exclude of this project's build file. This test can still be run (and passes) using DirectRunner and the evidence in the [above comment](https://github.com/apache/beam/pull/11339#issuecomment-631203002) points to this being a red herring of an issue with test pubsub signal and dataflow runner. Potentially related issue: [BEAM-6804](https://issues.apache.org/jira/browse/BEAM-6804?jql=text%20~%20%22TestPubsubSignal%22) I've filed an issue for this instance. @pabloem please let me know if this is acceptable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] damondouglas commented on pull request #11734: [BEAM-9679] Add Core Transforms section / GroupByKey lesson to the Go SDK katas
damondouglas commented on pull request #11734: URL: https://github.com/apache/beam/pull/11734#issuecomment-631261638 @lostluck and @henryken, I've updated the [stepik](https://stepik.org/course/70387) course and committed the `*-remote.yaml` files to this PR. Thank you again for all your help and guidance. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] jaketf edited a comment on pull request #11339: [BEAM-9468] Fhir io
jaketf edited a comment on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-631203002 The issue with FhirIOReadIT seems to be some misuse of [TestPubsubSignal](https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.html) [Example Job](https://pantheon.corp.google.com/dataflow/jobs/us-central1/2020-05-19_19_05_19-1057064265214622054?project=apache-beam-testing) clearly has the expected >2000 elements added to the "waitForAnyMessage" task but the success signal never gets published to the results topic. Notably there are job level warnings about metric descriptors and [warnings in shuffle logs](https://pantheon.corp.google.com/logs/viewer?dateRangeEnd=2020-05-20T02:18:25.642Z&dateRangeStart=2020-05-20T02:05:21.347Z&expandAll=false&interval=CUSTOM&project=apache-beam-testing&resource=dataflow_step%2Fjob_id%2F2020-05-19_19_05_19-1057064265214622054×tamp=2020-05-20T02:57:46.18700Z&logName=projects%2Fapache-beam-testing%2Flogs%2Fdataflow.googleapis.com%252Fshuffler&minLogLevel=0&customFacets=&limitCustomFacetWidth=true&scrollTimestamp=2020-05-20T02:15:27.999165390Z) which warns: ``` "Update range task returned 'invalid argument'. Assuming lost lease for work with id 5061980071068333770 (expiration time: 1589940982000, now: 1589940923590, full status: INVALID_ARGUMENT: Http(400) Bad Request). For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors."; ``` the [docs](https://cloud.google.com/dataflow/docs/guides/common-errors#bad-request-shuffler-logs) say this can be ignored but smells suspicious here. This is orthogonal to the behavior being tested. Investigating other means of performing this test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] y1chi commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
y1chi commented on a change in pull request #11756: URL: https://github.com/apache/beam/pull/11756#discussion_r427759019 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) { } } - private static class FnApiTimerMap implements TimerMap { -FnApiTimerMap() {} + private class FnApiTimerMap implements TimerMap { +private final String timerFamilyId; +private final K userKey; +private final TimeDomain timeDomain; +private final Instant elementTimestampOrTimerHoldTimestamp; +private final Instant elementTimestampOrTimerFireTimestamp; +private final BoundedWindow boundedWindow; +private final PaneInfo paneInfo; + +FnApiTimerMap( +String timerFamilyId, +K userKey, +BoundedWindow boundedWindow, +Instant elementTimestampOrTimerHoldTimestamp, +Instant elementTimestampOrTimerFireTimestamp, +PaneInfo paneInfo) { + this.timerFamilyId = timerFamilyId; + this.userKey = userKey; + this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp; + this.elementTimestampOrTimerFireTimestamp = elementTimestampOrTimerFireTimestamp; + this.boundedWindow = boundedWindow; + this.paneInfo = paneInfo; + + TimerFamilyDeclaration timerFamilyDeclaration = + doFnSignature.timerFamilyDeclarations().get(timerFamilyId); + this.timeDomain = + DoFnSignatures.getTimerFamilySpecOrThrow(timerFamilyDeclaration, doFn).getTimeDomain(); +} @Override -public void set(String timerId, Instant absoluteTime) {} +public void set(String dynamicTimerTag, Instant absoluteTime) { Review comment: probably makes sense to keep the set shortcut since it is the most frequently used one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] y1chi commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
y1chi commented on a change in pull request #11756: URL: https://github.com/apache/beam/pull/11756#discussion_r427758607 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) { } } - private static class FnApiTimerMap implements TimerMap { -FnApiTimerMap() {} + private class FnApiTimerMap implements TimerMap { +private final String timerFamilyId; +private final K userKey; +private final TimeDomain timeDomain; +private final Instant elementTimestampOrTimerHoldTimestamp; +private final Instant elementTimestampOrTimerFireTimestamp; +private final BoundedWindow boundedWindow; +private final PaneInfo paneInfo; + +FnApiTimerMap( +String timerFamilyId, +K userKey, +BoundedWindow boundedWindow, +Instant elementTimestampOrTimerHoldTimestamp, +Instant elementTimestampOrTimerFireTimestamp, +PaneInfo paneInfo) { + this.timerFamilyId = timerFamilyId; + this.userKey = userKey; + this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp; + this.elementTimestampOrTimerFireTimestamp = elementTimestampOrTimerFireTimestamp; + this.boundedWindow = boundedWindow; + this.paneInfo = paneInfo; + + TimerFamilyDeclaration timerFamilyDeclaration = + doFnSignature.timerFamilyDeclarations().get(timerFamilyId); + this.timeDomain = + DoFnSignatures.getTimerFamilySpecOrThrow(timerFamilyDeclaration, doFn).getTimeDomain(); +} @Override -public void set(String timerId, Instant absoluteTime) {} +public void set(String dynamicTimerTag, Instant absoluteTime) { Review comment: This is the interface required by TimerMap though: https://github.com/apache/beam/blob/9108832cf1cb57161997e16190dbc6eccdc10492/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerMap.java#L25 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3
chamikaramj commented on pull request #11651: URL: https://github.com/apache/beam/pull/11651#issuecomment-631254015 Have you run the linkage checker ? https://cwiki.apache.org/confluence/display/BEAM/Dependency+Upgrades This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3
chamikaramj commented on pull request #11651: URL: https://github.com/apache/beam/pull/11651#issuecomment-631252638 Run Python2_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3
chamikaramj commented on pull request #11651: URL: https://github.com/apache/beam/pull/11651#issuecomment-631252696 Run Python2_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on a change in pull request #11740: [BEAM-8019] Prevent Dataflow from starting multiple containers for the same image
chamikaramj commented on a change in pull request #11740: URL: https://github.com/apache/beam/pull/11740#discussion_r427755500 ## File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py ## @@ -310,15 +312,15 @@ def __init__( environment_payload = proto_utils.parse_Bytes( environment.payload, beam_runner_api_pb2.DockerPayload) container_image_url = environment_payload.container_image -if container_image_url == pipeline_sdk_container_image: - # This was already added +if container_image_url in already_added_containers: + # Do not add the pipeline environment again. Review comment: Sent https://github.com/apache/beam/pull/11757. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11757: [BEAM-8019] Clarifies Dataflow execution environment model
chamikaramj commented on pull request #11757: URL: https://github.com/apache/beam/pull/11757#issuecomment-631251258 R: @robertwb @ihji This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj opened a new pull request #11757: [BEAM-8019] Clarifies Dataflow execution environment model
chamikaramj opened a new pull request #11757: URL: https://github.com/apache/beam/pull/11757 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Buil
[GitHub] [beam] boyuanzz commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
boyuanzz commented on a change in pull request #11756: URL: https://github.com/apache/beam/pull/11756#discussion_r427750709 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) { } } - private static class FnApiTimerMap implements TimerMap { -FnApiTimerMap() {} + private class FnApiTimerMap implements TimerMap { +private final String timerFamilyId; +private final K userKey; +private final TimeDomain timeDomain; +private final Instant elementTimestampOrTimerHoldTimestamp; +private final Instant elementTimestampOrTimerFireTimestamp; +private final BoundedWindow boundedWindow; +private final PaneInfo paneInfo; + +FnApiTimerMap( +String timerFamilyId, +K userKey, +BoundedWindow boundedWindow, +Instant elementTimestampOrTimerHoldTimestamp, +Instant elementTimestampOrTimerFireTimestamp, +PaneInfo paneInfo) { + this.timerFamilyId = timerFamilyId; + this.userKey = userKey; + this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp; + this.elementTimestampOrTimerFireTimestamp = elementTimestampOrTimerFireTimestamp; + this.boundedWindow = boundedWindow; + this.paneInfo = paneInfo; + + TimerFamilyDeclaration timerFamilyDeclaration = + doFnSignature.timerFamilyDeclarations().get(timerFamilyId); + this.timeDomain = + DoFnSignatures.getTimerFamilySpecOrThrow(timerFamilyDeclaration, doFn).getTimeDomain(); +} @Override -public void set(String timerId, Instant absoluteTime) {} +public void set(String dynamicTimerTag, Instant absoluteTime) { Review comment: Thanks for the clarification! I still think we should have a better API(and doc) here, like `getTimer(timerId)`? And I would prefer not exposing `set()` since `getTimer()` is a more recommended way. What do you think? You can also start a discussion thread in dev list since it's a user faced API. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] y1chi commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
y1chi commented on a change in pull request #11756: URL: https://github.com/apache/beam/pull/11756#discussion_r427738041 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) { } } - private static class FnApiTimerMap implements TimerMap { -FnApiTimerMap() {} + private class FnApiTimerMap implements TimerMap { +private final String timerFamilyId; +private final K userKey; +private final TimeDomain timeDomain; +private final Instant elementTimestampOrTimerHoldTimestamp; +private final Instant elementTimestampOrTimerFireTimestamp; +private final BoundedWindow boundedWindow; +private final PaneInfo paneInfo; + +FnApiTimerMap( +String timerFamilyId, +K userKey, +BoundedWindow boundedWindow, +Instant elementTimestampOrTimerHoldTimestamp, +Instant elementTimestampOrTimerFireTimestamp, +PaneInfo paneInfo) { + this.timerFamilyId = timerFamilyId; + this.userKey = userKey; + this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp; + this.elementTimestampOrTimerFireTimestamp = elementTimestampOrTimerFireTimestamp; + this.boundedWindow = boundedWindow; + this.paneInfo = paneInfo; + + TimerFamilyDeclaration timerFamilyDeclaration = + doFnSignature.timerFamilyDeclarations().get(timerFamilyId); + this.timeDomain = + DoFnSignatures.getTimerFamilySpecOrThrow(timerFamilyDeclaration, doFn).getTimeDomain(); +} @Override -public void set(String timerId, Instant absoluteTime) {} +public void set(String dynamicTimerTag, Instant absoluteTime) { Review comment: I believe we can always call the get() first to access FnApiTimer and call it's APIs. Probably that's sufficient enough? I feel adding more shortcuts only makes the API slightly more user-friendly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] y1chi commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
y1chi commented on a change in pull request #11756: URL: https://github.com/apache/beam/pull/11756#discussion_r427737289 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -962,16 +971,25 @@ private Progress getProgress() { .build()); } - private void processTimer(String timerId, TimeDomain timeDomain, Timer timer) { + private void processTimer( + String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer timer) { currentTimer = timer; currentTimeDomain = timeDomain; onTimerContext = new OnTimerContext<>(timer.getUserKey()); +String timerId = +timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX) Review comment: it will be ignored anyway, apparently only one of timerId or the timerFamilyId takes effect. https://github.com/apache/beam/blob/591de3473144de54beef0932131025e2a4d8504b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java#L223 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tysonjh commented on a change in pull request #11566: [BEAM-9723] Add DLP integration transforms
tysonjh commented on a change in pull request #11566: URL: https://github.com/apache/beam/pull/11566#discussion_r427471127 ## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.privacy.dlp.v2.Table; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * DoFn batching the input PCollection into bigger requests in order to better utilize the Cloud DLP Review comment: It's good practice to start all javadoc comments with a short summary fragment. There are more details at Google's java style guide [1]. For example, I would phrase the summary fragment for this class as: 'Batches input rows to reduce the number of requests sent to the Cloud DLP service.' Would you please go through this CL and add such comments to public classes and methods? I personally like to add them to all classes, non-trivial methods, and tricky blocks of code, regardless of access modifiers. [1] https://google.github.io/styleguide/javaguide.html#s7.2-summary-fragment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages
pabloem commented on pull request #11596: URL: https://github.com/apache/beam/pull/11596#issuecomment-631224035 Run Java PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
boyuanzz commented on pull request #11756: URL: https://github.com/apache/beam/pull/11756#issuecomment-631223098 Run Java Flink PortableValidatesRunner Streaming This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
boyuanzz commented on pull request #11756: URL: https://github.com/apache/beam/pull/11756#issuecomment-631223003 Run Java Flink PortableValidatesRunner Batch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
boyuanzz commented on pull request #11756: URL: https://github.com/apache/beam/pull/11756#issuecomment-631220097 Run Java Flink PortableValidatesRunner Batch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz closed pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
boyuanzz closed pull request #11756: URL: https://github.com/apache/beam/pull/11756 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
boyuanzz commented on pull request #11756: URL: https://github.com/apache/beam/pull/11756#issuecomment-631219846 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
boyuanzz commented on a change in pull request #11756: URL: https://github.com/apache/beam/pull/11756#discussion_r427720451 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -962,16 +971,25 @@ private Progress getProgress() { .build()); } - private void processTimer(String timerId, TimeDomain timeDomain, Timer timer) { + private void processTimer( + String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer timer) { currentTimer = timer; currentTimeDomain = timeDomain; onTimerContext = new OnTimerContext<>(timer.getUserKey()); +String timerId = +timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX) Review comment: If the `timerIdOrTimerFamilyId ` is for a timer family, should the timerId be the `timer.dynamicTimerTag`? ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) { } } - private static class FnApiTimerMap implements TimerMap { -FnApiTimerMap() {} + private class FnApiTimerMap implements TimerMap { +private final String timerFamilyId; +private final K userKey; +private final TimeDomain timeDomain; +private final Instant elementTimestampOrTimerHoldTimestamp; +private final Instant elementTimestampOrTimerFireTimestamp; +private final BoundedWindow boundedWindow; +private final PaneInfo paneInfo; + +FnApiTimerMap( +String timerFamilyId, +K userKey, +BoundedWindow boundedWindow, +Instant elementTimestampOrTimerHoldTimestamp, +Instant elementTimestampOrTimerFireTimestamp, +PaneInfo paneInfo) { + this.timerFamilyId = timerFamilyId; + this.userKey = userKey; + this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp; + this.elementTimestampOrTimerFireTimestamp = elementTimestampOrTimerFireTimestamp; + this.boundedWindow = boundedWindow; + this.paneInfo = paneInfo; + + TimerFamilyDeclaration timerFamilyDeclaration = + doFnSignature.timerFamilyDeclarations().get(timerFamilyId); + this.timeDomain = Review comment: Similar to `FnApiTimer` above, we should have `timeDomain` from proto, right? ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -460,14 +461,22 @@ public void accept(WindowedValue input) throws Exception { // Extract out relevant TimerFamilySpec information in preparation for execution. for (Map.Entry entry : parDoPayload.getTimerFamilySpecsMap().entrySet()) { -String timerFamilyId = entry.getKey(); -TimeDomain timeDomain = -DoFnSignatures.getTimerSpecOrThrow( -doFnSignature.timerDeclarations().get(timerFamilyId), doFn) -.getTimeDomain(); +String timerIdOrTimerFamilyId = entry.getKey(); +TimeDomain timeDomain; +if (timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)) { + timeDomain = Review comment: The `TTimerFamilySpec` should have `time_domain ` field. Maybe we could do something similar to https://github.com/apache/beam/blob/1de50c348706ed25af2bab9c9477d7d4f36ef8bf/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java#L657-L668 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -460,14 +461,22 @@ public void accept(WindowedValue input) throws Exception { // Extract out relevant TimerFamilySpec information in preparation for execution. for (Map.Entry entry : parDoPayload.getTimerFamilySpecsMap().entrySet()) { -String timerFamilyId = entry.getKey(); -TimeDomain timeDomain = -DoFnSignatures.getTimerSpecOrThrow( -doFnSignature.timerDeclarations().get(timerFamilyId), doFn) -.getTimeDomain(); +String timerIdOrTimerFamilyId = entry.getKey(); +TimeDomain timeDomain; +if (timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)) { + timeDomain = + DoFnSignatures.getTimerFamilySpecOrThrow( + doFnSignature.timerFamilyDeclarations().get(timerIdOrTimerFamilyId), doFn) + .getTimeDomain(); +} else { + timeDomain = + DoFnSignatures.getTimerSpecOrThrow( + doFnSignature.timerDeclarations().get(timerIdOrTimerFamilyId), doFn) + .getTimeDomain(); +} Coder> timerCoder = (Coder) rehydratedComponents.getCoder(entry.getValue().getTimerFamilyCoderId()); -timerFamilyInfosBuilder.put(timerFamilyId, KV.of(timeDomain, timerCoder)); +timerFamilyInfosBui
[GitHub] [beam] henryken commented on pull request #11734: [BEAM-9679] Add Core Transforms section / GroupByKey lesson to the Go SDK katas
henryken commented on pull request #11734: URL: https://github.com/apache/beam/pull/11734#issuecomment-631216779 @damondouglas, please help to update the Stepik course. Afterwards, we can merge this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] jaketf edited a comment on pull request #11339: [BEAM-9468] Fhir io
jaketf edited a comment on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-631203002 The issue with FhirIOReadIT seems to be some misuse of [TestPubsubSignal](https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.html) [Example Job](https://pantheon.corp.google.com/dataflow/jobs/us-central1/2020-05-19_19_05_19-1057064265214622054?project=apache-beam-testing) clearly has elements added to the "waitForAnyMessage" task but the success signal never gets published to the results topic. Notably there are job level warnings about metric descriptors and [warnings in shuffle logs](https://pantheon.corp.google.com/logs/viewer?dateRangeEnd=2020-05-20T02:18:25.642Z&dateRangeStart=2020-05-20T02:05:21.347Z&expandAll=false&interval=CUSTOM&project=apache-beam-testing&resource=dataflow_step%2Fjob_id%2F2020-05-19_19_05_19-1057064265214622054×tamp=2020-05-20T02:57:46.18700Z&logName=projects%2Fapache-beam-testing%2Flogs%2Fdataflow.googleapis.com%252Fshuffler&minLogLevel=0&customFacets=&limitCustomFacetWidth=true&scrollTimestamp=2020-05-20T02:15:27.999165390Z) which warns: ``` "Update range task returned 'invalid argument'. Assuming lost lease for work with id 5061980071068333770 (expiration time: 1589940982000, now: 1589940923590, full status: INVALID_ARGUMENT: Http(400) Bad Request). For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors."; ``` the [docs](https://cloud.google.com/dataflow/docs/guides/common-errors#bad-request-shuffler-logs) say this can be ignored but smells suspicious here. This is orthogonal to the behavior being tested. Investigating other means of performing this test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on a change in pull request #11360: [BEAM-9722] added SnowflakeIO with Read operation
chamikaramj commented on a change in pull request #11360: URL: https://github.com/apache/beam/pull/11360#discussion_r427718773 ## File path: sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/SnowflakeIO.java ## @@ -0,0 +1,735 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.snowflake; + +import static org.apache.beam.sdk.io.TextIO.readFiles; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import com.opencsv.CSVParser; +import com.opencsv.CSVParserBuilder; +import java.io.IOException; +import java.io.Serializable; +import java.security.PrivateKey; +import java.sql.Connection; +import java.sql.SQLException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import javax.sql.DataSource; +import net.snowflake.client.jdbc.SnowflakeBasicDataSource; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.io.FileIO; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.apache.beam.sdk.io.snowflake.credentials.KeyPairSnowflakeCredentials; +import org.apache.beam.sdk.io.snowflake.credentials.OAuthTokenSnowflakeCredentials; +import org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentials; +import org.apache.beam.sdk.io.snowflake.credentials.UsernamePasswordSnowflakeCredentials; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.Wait; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * IO to read and write data on Snowflake. + * + * SnowflakeIO uses https://docs.snowflake.net/manuals/user-guide/jdbc.html";>Snowflake + * JDBC driver under the hood, but data isn't read/written using JDBC directly. Instead, + * SnowflakeIO uses dedicated COPY operations to read/write data from/to a cloud bucket. By + * now only Google Cloud Storage is supported. + * + * To configure SnowflakeIO to read/write from your Snowflake instance, you have to provide a + * {@link DataSourceConfiguration} using {@link + * DataSourceConfiguration#create(SnowflakeCredentials)}, where {@link SnowflakeCredentials might be + * created using {@link org.apache.beam.sdk.io.snowflake.credentials.SnowflakeCredentialsFactory}}. + * Additionally one of {@link DataSourceConfiguration#withServerName(String)} or {@link + * DataSourceConfiguration#withUrl(String)} must be used to tell SnowflakeIO which instance to use. + * + * There are also other options available to configure connection to Snowflake: + * + * + * {@link DataSourceConfiguration#withWarehouse(String)} to specify which Warehouse to use + * {@link DataSourceConfiguration#withDatabase(String)} to specify which Database to connect + * to + * {@link DataSourceConfiguration#withSchema(String)} to specify which schema to use + * {@link DataSourceConfiguration#withRole(String)} to specify which role to use + * {@link DataSourceConfiguration#withLoginTimeout(Integer)} to specify the timeout for the + * login + * {@link DataSourceConfiguration#withPortNumber(Integer)} to specify custom port of Snowflake + * instance + * + * + * For example: + * + * {@code + * SnowflakeIO.DataSourceConfiguration dataSourceConfiguration = + * SnowflakeIO.DataSourceConfiguration.create(SnowflakeCredentialsFactory.of(options)) + * .withServerName(options.getServerName()) + * .withWarehouse(options.getWarehouse()) + * .withDatabase(options.getDatabase()) + * .withSchema(options.getSchema()); + * } + * + *
[GitHub] [beam] jaketf edited a comment on pull request #11339: [BEAM-9468] Fhir io
jaketf edited a comment on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-631203002 The issue with FhirIOReadIT seems to be some misuse of [TestPubsubSignal](https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.html) [Example Job](https://pantheon.corp.google.com/dataflow/jobs/us-central1/2020-05-19_19_05_19-1057064265214622054?project=apache-beam-testing) clearly has elements added to the "waitForAnyMessage" task but the success signal never gets published to the results topic. Notably there are [warnings in shuffle logs](https://pantheon.corp.google.com/logs/viewer?dateRangeEnd=2020-05-20T02:18:25.642Z&dateRangeStart=2020-05-20T02:05:21.347Z&expandAll=false&interval=CUSTOM&project=apache-beam-testing&resource=dataflow_step%2Fjob_id%2F2020-05-19_19_05_19-1057064265214622054×tamp=2020-05-20T02:57:46.18700Z&logName=projects%2Fapache-beam-testing%2Flogs%2Fdataflow.googleapis.com%252Fshuffler&minLogLevel=0&customFacets=&limitCustomFacetWidth=true&scrollTimestamp=2020-05-20T02:15:27.999165390Z) which warns: ``` "Update range task returned 'invalid argument'. Assuming lost lease for work with id 5061980071068333770 (expiration time: 1589940982000, now: 1589940923590, full status: INVALID_ARGUMENT: Http(400) Bad Request). For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors."; ``` the [docs](https://cloud.google.com/dataflow/docs/guides/common-errors#bad-request-shuffler-logs) say this can be ignored but smells suspicious here. This is orthogonal to the behavior being tested. Investigating other means of performing this test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on pull request #11715: [BEAM-9977] Implement GrowableOffsetRangeTracker
boyuanzz commented on pull request #11715: URL: https://github.com/apache/beam/pull/11715#issuecomment-631204566 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] jaketf edited a comment on pull request #11339: [BEAM-9468] Fhir io
jaketf edited a comment on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-631203002 The issue with FhirIOReadIT seems to be some misuse of [TestPubsubSignal](https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.html) [Example Job](https://pantheon.corp.google.com/dataflow/jobs/us-central1/2020-05-19_19_05_19-1057064265214622054?project=apache-beam-testing) clearly has elements added to the "waitForAnyMessage" task but the success signal never gets published to the results topic. This is orthogonal to the behavior being tested. Investigating other means of performing this test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] jaketf commented on pull request #11339: [BEAM-9468] Fhir io
jaketf commented on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-631203002 The issue with FhirIOReadIT seems to be some misuse of [TestPubsubSignal](https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.html) [Example Job](https://pantheon.corp.google.com/dataflow/jobs/us-central1/2020-05-19_19_05_19-1057064265214622054?project=apache-beam-testing) clearly has elements added to the "waitForAnyMessage" task but the success signal never gets published to the results topic. This is tangential to the behavior being tested. Investigating other means of performing this test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] henryken commented on pull request #11736: Katas - Convert task description from HTML to Markdown
henryken commented on pull request #11736: URL: https://github.com/apache/beam/pull/11736#issuecomment-631201222 Thanks @pabloem! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages
pabloem commented on pull request #11596: URL: https://github.com/apache/beam/pull/11596#issuecomment-631189470 Run Java PostCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages
pabloem commented on pull request #11596: URL: https://github.com/apache/beam/pull/11596#issuecomment-631189422 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] omarismail94 commented on a change in pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build
omarismail94 commented on a change in pull request #11754: URL: https://github.com/apache/beam/pull/11754#discussion_r427686703 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ## @@ -66,38 +68,47 @@ public static void main(String[] args) { inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); // print the output record of case 1; -outputStream.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// PCOLLECTION: [3, row, 3.0] -// PCOLLECTION: [2, row, 2.0] -System.out.println("PCOLLECTION: " + input.getValues()); -return input; - } -})); +outputStream +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// PCOLLECTION: [3, row, 3.0] +// PCOLLECTION: [2, row, 2.0] +System.out.println("PCOLLECTION: " + input.getValues()); +return input; + } +})) +.setCoder(RowCoder.of(type)); // Case 2. run the query with SqlTransform.query over result PCollection of case 1. PCollection outputStream2 = PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream) .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2")); // print the output record of case 2; -outputStream2.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// CASE1_RESULT: [row, 5.0] -System.out.println("CASE1_RESULT: " + input.getValues()); -return input; - } -})); +outputStream2 +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// CASE1_RESULT: [row, 5.0] +System.out.println("CASE1_RESULT: " + input.getValues()); +return input; + } +})) +.setCoder( +RowCoder.of( +Schema.builder() +.addStringField("stringField") +.addDoubleField("doubleField") +.build())); Review comment: Actually, it is not due to the reduction in the number of field, but the order in which the fields are selected in the SELECT statement. Here is the order it expects * Int, String, Double and the fields that represent those types are: c1, c2, c3 If your results print out of order, it fails due to the `ClassCastException`. I tried doing this query and it failed: `select c2, sum(c1), sum(c3) from CASE1_RESULT group by c2`, but if I do `select sum(c1),c2, sum(c3) from CASE1_RESULT group by c2` it works! You can see that in the one that failed, c1 and c2s positions have switched, so the encoder trips out. What's cool is that you can see the results correctly calculated in: ` System.out.println("CASE1_RESULT: " + input.getValues());` but it seems that when the result is encoded, the program throws an error due to the results being out of order. I guess this is because it sees `.setRowSchema(type);`, and as the order of the schema is "Int, String, Double", the results have to abide by that rule. That why it fails when we did: `c2, sum(c3) from CASE1_RESULT group by c2` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] omarismail94 commented on a change in pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build
omarismail94 commented on a change in pull request #11754: URL: https://github.com/apache/beam/pull/11754#discussion_r427686703 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ## @@ -66,38 +68,47 @@ public static void main(String[] args) { inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); // print the output record of case 1; -outputStream.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// PCOLLECTION: [3, row, 3.0] -// PCOLLECTION: [2, row, 2.0] -System.out.println("PCOLLECTION: " + input.getValues()); -return input; - } -})); +outputStream +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// PCOLLECTION: [3, row, 3.0] +// PCOLLECTION: [2, row, 2.0] +System.out.println("PCOLLECTION: " + input.getValues()); +return input; + } +})) +.setCoder(RowCoder.of(type)); // Case 2. run the query with SqlTransform.query over result PCollection of case 1. PCollection outputStream2 = PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream) .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2")); // print the output record of case 2; -outputStream2.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// CASE1_RESULT: [row, 5.0] -System.out.println("CASE1_RESULT: " + input.getValues()); -return input; - } -})); +outputStream2 +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// CASE1_RESULT: [row, 5.0] +System.out.println("CASE1_RESULT: " + input.getValues()); +return input; + } +})) +.setCoder( +RowCoder.of( +Schema.builder() +.addStringField("stringField") +.addDoubleField("doubleField") +.build())); Review comment: Actually, it is not due to the reduction in the number of field, but the order in which the fields are printed. Here is the order it expects * Int, String, Double and the fields that represent those types are: c1, c2, c3 If your results print out of order, it fails due to the `ClassCastException`. I tried doing this query and it failed: `select c2, sum(c1), sum(c3) from CASE1_RESULT group by c2`, but if I do `select sum(c1),c2, sum(c3) from CASE1_RESULT group by c2` it works! You can see that in the one that failed, c1 and c2s positions have switched, so the encoder trips out. What's cool is that you can see the results correctly calculated in: ` System.out.println("CASE1_RESULT: " + input.getValues());` but it seems that when the result is encoded, the program throws an error due to the results being out of order. I guess this is because it sees `.setRowSchema(type);`, and as the order of the schema is "Int, String, Double", the results have to abide by that rule. That why it fails when we did: `c2, sum(c3) from CASE1_RESULT group by c2` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages
jaketf commented on a change in pull request #11596: URL: https://github.com/apache/beam/pull/11596#discussion_r427686974 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ## @@ -427,29 +454,75 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) * @param filter the filter */ ListHL7v2Messages(ValueProvider> hl7v2Stores, ValueProvider filter) { - this.hl7v2Stores = hl7v2Stores.get(); - this.filter = filter.get(); + this.hl7v2Stores = hl7v2Stores; + this.filter = filter; +} + +/** + * Instantiates a new List hl 7 v 2 messages. + * + * @param hl7v2Stores the hl 7 v 2 stores + * @param filter the filter + * @param initialSplitDuration the initial split duration for sendTime dimension splits + */ +ListHL7v2Messages( +ValueProvider> hl7v2Stores, +ValueProvider filter, +Duration initialSplitDuration) { + this.hl7v2Stores = hl7v2Stores; + this.filter = filter; + this.initialSplitDuration = initialSplitDuration; } +/** + * Instantiates a new List hl7v2 messages. + * + * @param hl7v2Stores the hl7v2 stores + */ ListHL7v2Messages(ValueProvider> hl7v2Stores) { - this.hl7v2Stores = hl7v2Stores.get(); + this.hl7v2Stores = hl7v2Stores; this.filter = null; } +/** + * Instantiates a new List hl7v2 messages. + * + * @param hl7v2Stores the hl7v2 stores + * @param initialSplitDuration the initial split duration + */ +ListHL7v2Messages(ValueProvider> hl7v2Stores, Duration initialSplitDuration) { + this.hl7v2Stores = hl7v2Stores; + this.initialSplitDuration = initialSplitDuration; +} + @Override public PCollection expand(PBegin input) { return input - .apply(Create.of(this.hl7v2Stores)) - .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter))) + .apply(Create.ofProvider(this.hl7v2Stores, ListCoder.of(StringUtf8Coder.of( + .apply(FlatMapElements.into(TypeDescriptors.strings()).via((x) -> x)) + .apply(ParDo.of(new ListHL7v2MessagesFn(this.filter, initialSplitDuration))) .setCoder(new HL7v2MessageCoder()) // Break fusion to encourage parallelization of downstream processing. .apply(Reshuffle.viaRandomKey()); } } + /** + * Implemented as Splitable DoFn that claims millisecond resolutions of offset restrictions in the + * Message.sendTime dimension. + */ + @BoundedPerElement static class ListHL7v2MessagesFn extends DoFn { -private final String filter; +private static final Logger LOG = LoggerFactory.getLogger(ListHL7v2MessagesFn.class); +private ValueProvider filter; +// These control the initial restriction split which means that the list of integer pairs +// must comfortably fit in memory. +private static final Duration DEFAULT_DESIRED_SPLIT_DURATION = Duration.standardDays(1); +private static final Duration DEFAULT_MIN_SPLIT_DURATION = Duration.standardHours(1); +private Duration initialSplitDuration; +private Instant from; +private Instant to; Review comment: I don't think so they don't get set until we make the earliest / lastest sendTime query in @GetInitialRestriction This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] omarismail94 commented on a change in pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build
omarismail94 commented on a change in pull request #11754: URL: https://github.com/apache/beam/pull/11754#discussion_r427686703 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ## @@ -66,38 +68,47 @@ public static void main(String[] args) { inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); // print the output record of case 1; -outputStream.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// PCOLLECTION: [3, row, 3.0] -// PCOLLECTION: [2, row, 2.0] -System.out.println("PCOLLECTION: " + input.getValues()); -return input; - } -})); +outputStream +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// PCOLLECTION: [3, row, 3.0] +// PCOLLECTION: [2, row, 2.0] +System.out.println("PCOLLECTION: " + input.getValues()); +return input; + } +})) +.setCoder(RowCoder.of(type)); // Case 2. run the query with SqlTransform.query over result PCollection of case 1. PCollection outputStream2 = PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream) .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2")); // print the output record of case 2; -outputStream2.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// CASE1_RESULT: [row, 5.0] -System.out.println("CASE1_RESULT: " + input.getValues()); -return input; - } -})); +outputStream2 +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// CASE1_RESULT: [row, 5.0] +System.out.println("CASE1_RESULT: " + input.getValues()); +return input; + } +})) +.setCoder( +RowCoder.of( +Schema.builder() +.addStringField("stringField") +.addDoubleField("doubleField") +.build())); Review comment: Actually, it is not due to the reduction in the number of field, but the order in which the fields are printed. Here is the order it expects * Int, String, Double and the fields that represent those types are: c1, c2, c3 If your results print out of order, it fails due to the `ClassCastException`. I tried doing this query and it failed: `select c2, sum(c1), sum(c3) from CASE1_RESULT group by c2`, but if I do `select sum(c1),c2, sum(c3) from CASE1_RESULT group by c2` it works! You can see that in the one that failed, c1 and c2s positions have switched, so the encoder trips out. What's cool is that you can see the results correctly calculated in: ` System.out.println("CASE1_RESULT: " + input.getValues());` but it seems that when the result is encoded, the program throws an error due to the results being out of order This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] santhh commented on a change in pull request #11566: [BEAM-9723] Add DLP integration transforms
santhh commented on a change in pull request #11566: URL: https://github.com/apache/beam/pull/11566#discussion_r427684818 ## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java ## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.auto.value.AutoValue; +import com.google.cloud.dlp.v2.DlpServiceClient; +import com.google.privacy.dlp.v2.ContentItem; +import com.google.privacy.dlp.v2.DeidentifyConfig; +import com.google.privacy.dlp.v2.DeidentifyContentRequest; +import com.google.privacy.dlp.v2.DeidentifyContentResponse; +import com.google.privacy.dlp.v2.FieldId; +import com.google.privacy.dlp.v2.InspectConfig; +import com.google.privacy.dlp.v2.ProjectName; +import com.google.privacy.dlp.v2.Table; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * A {@link PTransform} connecting to Cloud DLP and deidentifying text according to provided + * settings. The transform supports both CSV formatted input data and unstructured input. + * + * If the csvHeader property is set, csvDelimiter also should be, else the results will be + * incorrect. If csvHeader is not set, input is assumed to be unstructured. + * + * Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set. The + * situation is the same with deidentifyTemplateName and deidentifyConfig ({@link DeidentifyConfig}. + * + * Batch size defines how big are batches sent to DLP at once in bytes. + * + * The transform outputs {@link KV} of {@link String} (eg. filename) and {@link + * DeidentifyContentResponse}, which will contain {@link Table} of results for the user to consume. + */ +@Experimental +@AutoValue +public abstract class DLPDeidentifyText +extends PTransform< +PCollection>, PCollection>> { + + @Nullable + public abstract String inspectTemplateName(); + + @Nullable + public abstract String deidentifyTemplateName(); + + @Nullable + public abstract InspectConfig inspectConfig(); + + @Nullable + public abstract DeidentifyConfig deidentifyConfig(); + + @Nullable + public abstract PCollectionView> csvHeader(); + + @Nullable + public abstract String csvDelimiter(); + + public abstract Integer batchSize(); + + public abstract String projectId(); + + @AutoValue.Builder + public abstract static class Builder { +public abstract Builder setInspectTemplateName(String inspectTemplateName); + +public abstract Builder setCsvHeader(PCollectionView> csvHeader); + +public abstract Builder setCsvDelimiter(String delimiter); + +public abstract Builder setBatchSize(Integer batchSize); + +public abstract Builder setProjectId(String projectId); + +public abstract Builder setDeidentifyTemplateName(String deidentifyTemplateName); + +public abstract Builder setInspectConfig(InspectConfig inspectConfig); + +public abstract Builder setDeidentifyConfig(DeidentifyConfig deidentifyConfig); + +public abstract DLPDeidentifyText build(); + } + + public static DLPDeidentifyText.Builder newBuilder() { +return new AutoValue_DLPDeidentifyText.Builder(); + } + + /** + * The transform batches the contents of input PCollection and then calls Cloud DLP service to + * perform the deidentification. + * + * @param input input PCollection + * @return PCollection after transformations + */ + @Override + public PCollection> expand( + PCollection> input) { +return input +.apply(ParDo.of(new MapStringToDlpRow(csvDelimiter( +.apply("Batch Contents", ParDo.of(new BatchRequestForDLP(batchSize( +.apply( +"DLPDeidentify", +ParDo.of( +new DeidentifyTex
[GitHub] [beam] santhh commented on a change in pull request #11566: [BEAM-9723] Add DLP integration transforms
santhh commented on a change in pull request #11566: URL: https://github.com/apache/beam/pull/11566#discussion_r427683846 ## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java ## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.auto.value.AutoValue; +import com.google.cloud.dlp.v2.DlpServiceClient; +import com.google.privacy.dlp.v2.ContentItem; +import com.google.privacy.dlp.v2.DeidentifyConfig; +import com.google.privacy.dlp.v2.DeidentifyContentRequest; +import com.google.privacy.dlp.v2.DeidentifyContentResponse; +import com.google.privacy.dlp.v2.FieldId; +import com.google.privacy.dlp.v2.InspectConfig; +import com.google.privacy.dlp.v2.ProjectName; +import com.google.privacy.dlp.v2.Table; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * A {@link PTransform} connecting to Cloud DLP and deidentifying text according to provided + * settings. The transform supports both CSV formatted input data and unstructured input. + * + * If the csvHeader property is set, csvDelimiter also should be, else the results will be + * incorrect. If csvHeader is not set, input is assumed to be unstructured. + * + * Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set. The + * situation is the same with deidentifyTemplateName and deidentifyConfig ({@link DeidentifyConfig}. + * + * Batch size defines how big are batches sent to DLP at once in bytes. + * + * The transform outputs {@link KV} of {@link String} (eg. filename) and {@link + * DeidentifyContentResponse}, which will contain {@link Table} of results for the user to consume. + */ +@Experimental +@AutoValue +public abstract class DLPDeidentifyText +extends PTransform< +PCollection>, PCollection>> { + + @Nullable + public abstract String inspectTemplateName(); + + @Nullable + public abstract String deidentifyTemplateName(); + + @Nullable + public abstract InspectConfig inspectConfig(); + + @Nullable + public abstract DeidentifyConfig deidentifyConfig(); + + @Nullable + public abstract PCollectionView> csvHeader(); + + @Nullable + public abstract String csvDelimiter(); + + public abstract Integer batchSize(); + + public abstract String projectId(); + + @AutoValue.Builder + public abstract static class Builder { +public abstract Builder setInspectTemplateName(String inspectTemplateName); + +public abstract Builder setCsvHeader(PCollectionView> csvHeader); + +public abstract Builder setCsvDelimiter(String delimiter); + +public abstract Builder setBatchSize(Integer batchSize); + +public abstract Builder setProjectId(String projectId); + +public abstract Builder setDeidentifyTemplateName(String deidentifyTemplateName); + +public abstract Builder setInspectConfig(InspectConfig inspectConfig); + +public abstract Builder setDeidentifyConfig(DeidentifyConfig deidentifyConfig); + +public abstract DLPDeidentifyText build(); + } + Review comment: For de-id - it's also same as re-id. de-id template in required but inspect is optional. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] santhh commented on a change in pull request #11566: [BEAM-9723] Add DLP integration transforms
santhh commented on a change in pull request #11566: URL: https://github.com/apache/beam/pull/11566#discussion_r427683019 ## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.privacy.dlp.v2.Table; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.KV; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * DoFn batching the input PCollection into bigger requests in order to better utilize the Cloud DLP + * service. + */ +@Experimental +class BatchRequestForDLP extends DoFn, KV>> { + public static final Logger LOG = LoggerFactory.getLogger(BatchRequestForDLP.class); + + private final Counter numberOfRowsBagged = + Metrics.counter(BatchRequestForDLP.class, "numberOfRowsBagged"); + private final Integer batchSize; + + @StateId("elementsBag") + private final StateSpec>> elementsBag = StateSpecs.bag(); + + @TimerId("eventTimer") + private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + public BatchRequestForDLP(Integer batchSize) { Review comment: +1 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] y1chi opened a new pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
y1chi opened a new pull request #11756: URL: https://github.com/apache/beam/pull/11756 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build St
[GitHub] [beam] y1chi commented on pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
y1chi commented on pull request #11756: URL: https://github.com/apache/beam/pull/11756#issuecomment-631169795 R: @boyuanzz This is ready for review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] santhh commented on a change in pull request #11566: [BEAM-9723] Add DLP integration transforms
santhh commented on a change in pull request #11566: URL: https://github.com/apache/beam/pull/11566#discussion_r427681778 ## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java ## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.auto.value.AutoValue; +import com.google.cloud.dlp.v2.DlpServiceClient; +import com.google.privacy.dlp.v2.ContentItem; +import com.google.privacy.dlp.v2.DeidentifyConfig; +import com.google.privacy.dlp.v2.FieldId; +import com.google.privacy.dlp.v2.InspectConfig; +import com.google.privacy.dlp.v2.ProjectName; +import com.google.privacy.dlp.v2.ReidentifyContentRequest; +import com.google.privacy.dlp.v2.ReidentifyContentResponse; +import com.google.privacy.dlp.v2.Table; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} connecting to Cloud DLP and inspecting text for identifying data according + * to provided settings. + * + * Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set, the + * same goes for reidentifyTemplateName or reidentifyConfig. + * + * Batch size defines how big are batches sent to DLP at once in bytes. + */ +@Experimental +@AutoValue +public abstract class DLPReidentifyText +extends PTransform< +PCollection>, PCollection>> { + + public static final Logger LOG = LoggerFactory.getLogger(DLPInspectText.class); + + public static final Integer DLP_PAYLOAD_LIMIT = 52400; + + @Nullable + public abstract String inspectTemplateName(); + + @Nullable + public abstract String reidentifyTemplateName(); + + @Nullable + public abstract InspectConfig inspectConfig(); + + @Nullable + public abstract DeidentifyConfig reidentifyConfig(); + + @Nullable + public abstract String csvDelimiter(); + + @Nullable + public abstract PCollectionView> csvHeaders(); + + public abstract Integer batchSize(); + + public abstract String projectId(); + + @AutoValue.Builder + public abstract static class Builder { +public abstract Builder setInspectTemplateName(String inspectTemplateName); + +public abstract Builder setInspectConfig(InspectConfig inspectConfig); + +public abstract Builder setReidentifyConfig(DeidentifyConfig deidentifyConfig); + +public abstract Builder setReidentifyTemplateName(String deidentifyTemplateName); + +public abstract Builder setBatchSize(Integer batchSize); + +public abstract Builder setCsvHeaders(PCollectionView> csvHeaders); + +public abstract Builder setCsvDelimiter(String delimiter); + +public abstract Builder setProjectId(String projectId); + +public abstract DLPReidentifyText build(); + } + + public static DLPReidentifyText.Builder newBuilder() { +return new AutoValue_DLPReidentifyText.Builder(); + } + + @Override + public PCollection> expand( + PCollection> input) { +return input +.apply(ParDo.of(new MapStringToDlpRow(csvDelimiter( +.apply("Batch Contents", ParDo.of(new BatchRequestForDLP(batchSize( +.apply( +"DLPDeidentify", +ParDo.of( +new ReidentifyText( +projectId(), +inspectTemplateName(), +reidentifyTemplateName(), +inspectConfig(), +reidentifyConfig(), +csvHeaders(; + } + + public static class ReidentifyText + extends DoFn>, KV> { +private final String projectId; +private final String inspectTemplateName; +private final String reidentifyTemplateName; +private final InspectConfig inspectCon
[GitHub] [beam] TheNeuralBit commented on a change in pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build
TheNeuralBit commented on a change in pull request #11754: URL: https://github.com/apache/beam/pull/11754#discussion_r427681580 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ## @@ -66,38 +68,47 @@ public static void main(String[] args) { inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); // print the output record of case 1; -outputStream.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// PCOLLECTION: [3, row, 3.0] -// PCOLLECTION: [2, row, 2.0] -System.out.println("PCOLLECTION: " + input.getValues()); -return input; - } -})); +outputStream +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// PCOLLECTION: [3, row, 3.0] +// PCOLLECTION: [2, row, 2.0] +System.out.println("PCOLLECTION: " + input.getValues()); +return input; + } +})) +.setCoder(RowCoder.of(type)); // Case 2. run the query with SqlTransform.query over result PCollection of case 1. PCollection outputStream2 = PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream) .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2")); // print the output record of case 2; -outputStream2.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// CASE1_RESULT: [row, 5.0] -System.out.println("CASE1_RESULT: " + input.getValues()); -return input; - } -})); +outputStream2 +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// CASE1_RESULT: [row, 5.0] +System.out.println("CASE1_RESULT: " + input.getValues()); +return input; + } +})) +.setCoder( +RowCoder.of( +Schema.builder() +.addStringField("stringField") +.addDoubleField("doubleField") +.build())); Review comment: oh for this call you will need to use ``` Schema.builder() .addStringField("stringField") .addDoubleField("doubleField") .build() ``` like you had in the setCoder call This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] santhh commented on a change in pull request #11566: [BEAM-9723] Add DLP integration transforms
santhh commented on a change in pull request #11566: URL: https://github.com/apache/beam/pull/11566#discussion_r427681156 ## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java ## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.auto.value.AutoValue; +import com.google.cloud.dlp.v2.DlpServiceClient; +import com.google.privacy.dlp.v2.ContentItem; +import com.google.privacy.dlp.v2.DeidentifyConfig; +import com.google.privacy.dlp.v2.FieldId; +import com.google.privacy.dlp.v2.InspectConfig; +import com.google.privacy.dlp.v2.ProjectName; +import com.google.privacy.dlp.v2.ReidentifyContentRequest; +import com.google.privacy.dlp.v2.ReidentifyContentResponse; +import com.google.privacy.dlp.v2.Table; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} connecting to Cloud DLP and inspecting text for identifying data according + * to provided settings. + * + * Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set, the + * same goes for reidentifyTemplateName or reidentifyConfig. + * + * Batch size defines how big are batches sent to DLP at once in bytes. + */ +@Experimental +@AutoValue +public abstract class DLPReidentifyText +extends PTransform< +PCollection>, PCollection>> { + + public static final Logger LOG = LoggerFactory.getLogger(DLPInspectText.class); + + public static final Integer DLP_PAYLOAD_LIMIT = 52400; + + @Nullable + public abstract String inspectTemplateName(); + + @Nullable + public abstract String reidentifyTemplateName(); + + @Nullable + public abstract InspectConfig inspectConfig(); + + @Nullable + public abstract DeidentifyConfig reidentifyConfig(); + + @Nullable + public abstract String csvDelimiter(); + + @Nullable + public abstract PCollectionView> csvHeaders(); + + public abstract Integer batchSize(); + + public abstract String projectId(); + + @AutoValue.Builder + public abstract static class Builder { +public abstract Builder setInspectTemplateName(String inspectTemplateName); + +public abstract Builder setInspectConfig(InspectConfig inspectConfig); + +public abstract Builder setReidentifyConfig(DeidentifyConfig deidentifyConfig); + +public abstract Builder setReidentifyTemplateName(String deidentifyTemplateName); + +public abstract Builder setBatchSize(Integer batchSize); + +public abstract Builder setCsvHeaders(PCollectionView> csvHeaders); + +public abstract Builder setCsvDelimiter(String delimiter); + +public abstract Builder setProjectId(String projectId); + +public abstract DLPReidentifyText build(); + } + + public static DLPReidentifyText.Builder newBuilder() { +return new AutoValue_DLPReidentifyText.Builder(); + } + + @Override + public PCollection> expand( + PCollection> input) { +return input +.apply(ParDo.of(new MapStringToDlpRow(csvDelimiter( +.apply("Batch Contents", ParDo.of(new BatchRequestForDLP(batchSize( +.apply( +"DLPDeidentify", Review comment: should this be re-identify? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #11755: [BEAM-10038] Add script to mass-comment Jenkins triggers on PR
TheNeuralBit commented on a change in pull request #11755: URL: https://github.com/apache/beam/pull/11755#discussion_r427681057 ## File path: release/src/main/scripts/mass_comment.py ## @@ -0,0 +1,141 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Script for mass-commenting Jenkins test triggers on a Beam PR.""" + +import itertools +import os +import socket +import sys +import time +import traceback +import re +import requests +from datetime import datetime + + +COMMENTS_TO_ADD=[ + "Run Go PostCommit", + "Run Java PostCommit", + "Run Java PortabilityApi PostCommit", + "Run Java Flink PortableValidatesRunner Batch", + "Run Java Flink PortableValidatesRunner Streaming", + "Run Apex ValidatesRunner", + "Run Dataflow ValidatesRunner", + "Run Flink ValidatesRunner", + "Run Gearpump ValidatesRunner", + "Run Dataflow PortabilityApi ValidatesRunner", + "Run Samza ValidatesRunner", + "Run Spark ValidatesRunner", + "Run Python Dataflow ValidatesContainer", + "Run Python Dataflow ValidatesRunner", + "Run Python 3.5 Flink ValidatesRunner", + "Run Python 2 PostCommit", + "Run Python 3.5 PostCommit", + "Run SQL PostCommit", + "Run Go PreCommit", + "Run Java PreCommit", + "Run Java_Examples_Dataflow PreCommit", + "Run JavaPortabilityApi PreCommit", + "Run Portable_Python PreCommit", + "Run PythonLint PreCommit", + "Run Python PreCommit", + "Run Python DockerBuild PreCommit" +] Review comment: Should we also remove the duplicate list from [verify_release_build.sh](https://github.com/apache/beam/blob/master/release/src/main/scripts/verify_release_build.sh#L43) as part of this PR? I'm not really clear on where this list comes from. Is the goal to launch every single jenkins job? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] omarismail94 commented on a change in pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build
omarismail94 commented on a change in pull request #11754: URL: https://github.com/apache/beam/pull/11754#discussion_r427678727 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ## @@ -66,38 +68,47 @@ public static void main(String[] args) { inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); // print the output record of case 1; -outputStream.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// PCOLLECTION: [3, row, 3.0] -// PCOLLECTION: [2, row, 2.0] -System.out.println("PCOLLECTION: " + input.getValues()); -return input; - } -})); +outputStream +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// PCOLLECTION: [3, row, 3.0] +// PCOLLECTION: [2, row, 2.0] +System.out.println("PCOLLECTION: " + input.getValues()); +return input; + } +})) +.setCoder(RowCoder.of(type)); // Case 2. run the query with SqlTransform.query over result PCollection of case 1. PCollection outputStream2 = PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream) .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2")); // print the output record of case 2; -outputStream2.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// CASE1_RESULT: [row, 5.0] -System.out.println("CASE1_RESULT: " + input.getValues()); -return input; - } -})); +outputStream2 +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// CASE1_RESULT: [row, 5.0] +System.out.println("CASE1_RESULT: " + input.getValues()); +return input; + } +})) +.setCoder( +RowCoder.of( +Schema.builder() +.addStringField("stringField") +.addDoubleField("doubleField") +.build())); Review comment: This is part of the Stack trace that makes me think that ``` Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer at org.apache.beam.sdk.coders.VarIntCoder.encode(VarIntCoder.java:33) at org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate(RowCoderGenerator.java:270) at org.apache.beam.sdk.coders.Coder$ByteBuddy$E99UrF3W.encode(Unknown Source) at org.apache.beam.sdk.coders.Coder$ByteBuddy$E99UrF3W.encode(Unknown Source) at org.apache.beam.sdk.schemas.SchemaCoder.encode(SchemaCoder.java:115) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] y1chi closed pull request #11753: [BEAM-9603] Add timer family support to FnApiDoFnRunner
y1chi closed pull request #11753: URL: https://github.com/apache/beam/pull/11753 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] omarismail94 commented on a change in pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build
omarismail94 commented on a change in pull request #11754: URL: https://github.com/apache/beam/pull/11754#discussion_r427678033 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ## @@ -66,38 +68,47 @@ public static void main(String[] args) { inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); // print the output record of case 1; -outputStream.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// PCOLLECTION: [3, row, 3.0] -// PCOLLECTION: [2, row, 2.0] -System.out.println("PCOLLECTION: " + input.getValues()); -return input; - } -})); +outputStream +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// PCOLLECTION: [3, row, 3.0] +// PCOLLECTION: [2, row, 2.0] +System.out.println("PCOLLECTION: " + input.getValues()); +return input; + } +})) +.setCoder(RowCoder.of(type)); // Case 2. run the query with SqlTransform.query over result PCollection of case 1. PCollection outputStream2 = PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream) .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2")); // print the output record of case 2; -outputStream2.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// CASE1_RESULT: [row, 5.0] -System.out.println("CASE1_RESULT: " + input.getValues()); -return input; - } -})); +outputStream2 +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// CASE1_RESULT: [row, 5.0] +System.out.println("CASE1_RESULT: " + input.getValues()); +return input; + } +})) +.setCoder( +RowCoder.of( +Schema.builder() +.addStringField("stringField") +.addDoubleField("doubleField") +.build())); Review comment: I tried `setRowSchema(type)` and it failed with `: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer`. I think it is inferring the schema as 3 fields, but the result only returns two fields, and that's why it throws the error This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] omarismail94 commented on a change in pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build
omarismail94 commented on a change in pull request #11754: URL: https://github.com/apache/beam/pull/11754#discussion_r427678033 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ## @@ -66,38 +68,47 @@ public static void main(String[] args) { inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); // print the output record of case 1; -outputStream.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// PCOLLECTION: [3, row, 3.0] -// PCOLLECTION: [2, row, 2.0] -System.out.println("PCOLLECTION: " + input.getValues()); -return input; - } -})); +outputStream +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// PCOLLECTION: [3, row, 3.0] +// PCOLLECTION: [2, row, 2.0] +System.out.println("PCOLLECTION: " + input.getValues()); +return input; + } +})) +.setCoder(RowCoder.of(type)); // Case 2. run the query with SqlTransform.query over result PCollection of case 1. PCollection outputStream2 = PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream) .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2")); // print the output record of case 2; -outputStream2.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// CASE1_RESULT: [row, 5.0] -System.out.println("CASE1_RESULT: " + input.getValues()); -return input; - } -})); +outputStream2 +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// CASE1_RESULT: [row, 5.0] +System.out.println("CASE1_RESULT: " + input.getValues()); +return input; + } +})) +.setCoder( +RowCoder.of( +Schema.builder() +.addStringField("stringField") +.addDoubleField("doubleField") +.build())); Review comment: I tried `setRowSchema(type)` and it failed with `: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer`. I think it is inferring the schema as 3 fields, but the result only returns fields, and that's why it throws the error This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] omarismail94 commented on pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build
omarismail94 commented on pull request #11754: URL: https://github.com/apache/beam/pull/11754#issuecomment-631165775 > Thank you @omarismail94! > > We should probably be running this continuously to make sure we don't break it again. Would you mind adding the gradle task for this to the SQL preCommit [here](https://github.com/apache/beam/blob/master/build.gradle#L154)? That way it will run before we merge any PR that affects SQL. I will add both this and runPojoExample to SQL preCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] omarismail94 commented on a change in pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build
omarismail94 commented on a change in pull request #11754: URL: https://github.com/apache/beam/pull/11754#discussion_r427678033 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ## @@ -66,38 +68,47 @@ public static void main(String[] args) { inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); // print the output record of case 1; -outputStream.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// PCOLLECTION: [3, row, 3.0] -// PCOLLECTION: [2, row, 2.0] -System.out.println("PCOLLECTION: " + input.getValues()); -return input; - } -})); +outputStream +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// PCOLLECTION: [3, row, 3.0] +// PCOLLECTION: [2, row, 2.0] +System.out.println("PCOLLECTION: " + input.getValues()); +return input; + } +})) +.setCoder(RowCoder.of(type)); // Case 2. run the query with SqlTransform.query over result PCollection of case 1. PCollection outputStream2 = PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream) .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2")); // print the output record of case 2; -outputStream2.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// CASE1_RESULT: [row, 5.0] -System.out.println("CASE1_RESULT: " + input.getValues()); -return input; - } -})); +outputStream2 +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// CASE1_RESULT: [row, 5.0] +System.out.println("CASE1_RESULT: " + input.getValues()); +return input; + } +})) +.setCoder( +RowCoder.of( +Schema.builder() +.addStringField("stringField") +.addDoubleField("doubleField") +.build())); Review comment: I tried `setRowSchema(type) and it failed with `: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer`. I think it is inferring the schema as 3 fields, but the result only returns fields, and that's why it throws the error This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] omarismail94 commented on a change in pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build
omarismail94 commented on a change in pull request #11754: URL: https://github.com/apache/beam/pull/11754#discussion_r427677681 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ## @@ -66,38 +68,47 @@ public static void main(String[] args) { inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); // print the output record of case 1; -outputStream.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// PCOLLECTION: [3, row, 3.0] -// PCOLLECTION: [2, row, 2.0] -System.out.println("PCOLLECTION: " + input.getValues()); -return input; - } -})); +outputStream +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// PCOLLECTION: [3, row, 3.0] +// PCOLLECTION: [2, row, 2.0] +System.out.println("PCOLLECTION: " + input.getValues()); +return input; + } +})) +.setCoder(RowCoder.of(type)); Review comment: I can do that. I did `setRowSchema(type)` and it worked! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib opened a new pull request #11755: [BEAM-10038] Add script to mass-comment Jenkins triggers on PR
ibzib opened a new pull request #11755: URL: https://github.com/apache/beam/pull/11755 @Ardagan wrote most of this script a while back, I just generalized it a bit. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBui
[GitHub] [beam] TheNeuralBit commented on a change in pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build
TheNeuralBit commented on a change in pull request #11754: URL: https://github.com/apache/beam/pull/11754#discussion_r427674472 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ## @@ -66,38 +68,47 @@ public static void main(String[] args) { inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); // print the output record of case 1; -outputStream.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// PCOLLECTION: [3, row, 3.0] -// PCOLLECTION: [2, row, 2.0] -System.out.println("PCOLLECTION: " + input.getValues()); -return input; - } -})); +outputStream +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// PCOLLECTION: [3, row, 3.0] +// PCOLLECTION: [2, row, 2.0] +System.out.println("PCOLLECTION: " + input.getValues()); +return input; + } +})) +.setCoder(RowCoder.of(type)); // Case 2. run the query with SqlTransform.query over result PCollection of case 1. PCollection outputStream2 = PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream) .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2")); // print the output record of case 2; -outputStream2.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// CASE1_RESULT: [row, 5.0] -System.out.println("CASE1_RESULT: " + input.getValues()); -return input; - } -})); +outputStream2 +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// CASE1_RESULT: [row, 5.0] +System.out.println("CASE1_RESULT: " + input.getValues()); +return input; + } +})) +.setCoder( +RowCoder.of( +Schema.builder() +.addStringField("stringField") +.addDoubleField("doubleField") +.build())); Review comment: Here as well ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ## @@ -66,38 +68,47 @@ public static void main(String[] args) { inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1")); // print the output record of case 1; -outputStream.apply( -"log_result", -MapElements.via( -new SimpleFunction() { - @Override - public Row apply(Row input) { -// expect output: -// PCOLLECTION: [3, row, 3.0] -// PCOLLECTION: [2, row, 2.0] -System.out.println("PCOLLECTION: " + input.getValues()); -return input; - } -})); +outputStream +.apply( +"log_result", +MapElements.via( +new SimpleFunction() { + @Override + public Row apply(Row input) { +// expect output: +// PCOLLECTION: [3, row, 3.0] +// PCOLLECTION: [2, row, 2.0] +System.out.println("PCOLLECTION: " + input.getValues()); +return input; + } +})) +.setCoder(RowCoder.of(type)); Review comment: could you change this to `withRowSchema(type)`? It does the same thing, but it's less verbose This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] omarismail94 opened a new pull request #11754: [BEAM-10037] BeamSqlExample.java fails to build
omarismail94 opened a new pull request #11754: URL: https://github.com/apache/beam/pull/11754 R: @TheNeuralBit In the `BeamSqlExample.java` class, the instructions state that to run the example, use: `./gradlew :sdks:java:extensions:sql:runBasicExample`. I tried this and the build failed due to `java.lang.IllegalStateException: Unable to return a default Coder` I fixed this by setting the Coder for both anon transforms. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)
[GitHub] [beam] y1chi commented on pull request #11753: [BEAM-9603] Add timer family support to FnApiDoFnRunner
y1chi commented on pull request #11753: URL: https://github.com/apache/beam/pull/11753#issuecomment-631157957 Sorry, I should have marked PR as draft as I'm still testing it. Expecting a couple more minor fixes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on pull request #11753: [BEAM-9603] Add timer family support to FnApiDoFnRunner
boyuanzz commented on pull request #11753: URL: https://github.com/apache/beam/pull/11753#issuecomment-631153847 Run Java Flink PortableValidatesRunner Batch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on pull request #11403: [DO NOT MERGE] Run all PostCommit and PreCommit Tests against Release Branch
ibzib commented on pull request #11403: URL: https://github.com/apache/beam/pull/11403#issuecomment-631152816 test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on pull request #11403: [DO NOT MERGE] Run all PostCommit and PreCommit Tests against Release Branch
ibzib commented on pull request #11403: URL: https://github.com/apache/beam/pull/11403#issuecomment-631149865 test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #11701: [BEAM-9899] Fix some issues around storing schema `id` on user types
TheNeuralBit commented on a change in pull request #11701: URL: https://github.com/apache/beam/pull/11701#discussion_r427655332 ## File path: sdks/python/apache_beam/coders/row_coder.py ## @@ -134,19 +134,18 @@ def __init__(self, schema, components): def encode_to_stream(self, value, out, nested): nvals = len(self.schema.fields) self.SIZE_CODER.encode_to_stream(nvals, out, True) -attrs = [getattr(value, f.name) for f in self.schema.fields] Review comment: Yeah that's right. Right now it should only be possible to get here with a NamedTuple instance, so it should be safe. Looking forward to the day where more types might go through this code.. I kind of like the idea of using `tuple` as the "base" schema type in Python (i.e. the type we must be able to convert to/from for use in row coder). Relying on attributes isn't great since it limits us to field names that are valid python identifiers. All that being said I'd be fine dropping this part of the change for now. Renaming the special `id` field also fixes the bug. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages
jaketf commented on a change in pull request #11596: URL: https://github.com/apache/beam/pull/11596#discussion_r427654986 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ## @@ -472,24 +551,118 @@ public void initClient() throws IOException { this.client = new HttpHealthcareApiClient(); } +@GetInitialRestriction +public OffsetRange getEarliestToLatestRestriction(@Element String hl7v2Store) +throws IOException { + from = this.client.getEarliestHL7v2SendTime(hl7v2Store, this.filter.get()); + // filters are [from, to) to match logic of OffsetRangeTracker but need latest element to be + // included in results set to add an extra ms to the upper bound. + to = this.client.getLatestHL7v2SendTime(hl7v2Store, this.filter.get()).plus(1); + return new OffsetRange(from.getMillis(), to.getMillis()); +} + +@NewTracker +public OffsetRangeTracker newTracker(@Restriction OffsetRange timeRange) { + return timeRange.newTracker(); +} + +@SplitRestriction +public void split(@Restriction OffsetRange timeRange, OutputReceiver out) { + List splits = + timeRange.split(initialSplitDuration.getMillis(), DEFAULT_MIN_SPLIT_DURATION.getMillis()); + Instant from = Instant.ofEpochMilli(timeRange.getFrom()); + Instant to = Instant.ofEpochMilli(timeRange.getTo()); + Duration totalDuration = new Duration(from, to); + LOG.info( + String.format( + "splitting initial sendTime restriction of [minSendTime, now): [%s,%s), " + + "or [%s, %s). \n" + + "total days: %s \n" + + "into %s splits. \n" + + "Last split: %s", + from, + to, + timeRange.getFrom(), + timeRange.getTo(), + totalDuration.getStandardDays(), + splits.size(), + splits.get(splits.size() - 1).toString())); + + for (OffsetRange s : splits) { +out.output(s); + } +} + /** * List messages. * - * @param context the context + * @param hl7v2Store the HL7v2 store to list messages from * @throws IOException the io exception */ @ProcessElement -public void listMessages(ProcessContext context) throws IOException { - String hl7v2Store = context.element(); - // Output all elements of all pages. +public void listMessages( +@Element String hl7v2Store, +RestrictionTracker tracker, +OutputReceiver outputReceiver) +throws IOException { + OffsetRange currentRestriction = (OffsetRange) tracker.currentRestriction(); + Instant startRestriction = Instant.ofEpochMilli(currentRestriction.getFrom()); + Instant endRestriction = Instant.ofEpochMilli(currentRestriction.getTo()); HttpHealthcareApiClient.HL7v2MessagePages pages = - new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store, this.filter); + new HttpHealthcareApiClient.HL7v2MessagePages( + client, hl7v2Store, startRestriction, endRestriction, filter.get(), "sendTime"); long reqestTime = Instant.now().getMillis(); - for (Stream page : pages) { + long lastClaimedMilliSecond; + Instant cursor; + boolean hangingClaim = false; // flag if the claimed ms spans spills over to the next page. + for (List page : pages) { // loop over pages. +int i = 0; +HL7v2Message msg = page.get(i); +while (i < page.size()) { // loop over messages in page + cursor = Instant.parse(msg.getSendTime()); + lastClaimedMilliSecond = cursor.getMillis(); + LOG.info( + String.format( + "initial claim for page %s lastClaimedMilliSecond = %s", + i, lastClaimedMilliSecond)); + if (hangingClaim || tracker.tryClaim(lastClaimedMilliSecond)) { +// This means we have claimed an entire millisecond we need to make sure that we +// process all messages for this millisecond because sendTime is allegedly nano second +// resolution. +// https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.hl7V2Stores.messages#Message +while (cursor.getMillis() == lastClaimedMilliSecond +&& i < page.size()) { // loop over messages in millisecond. + outputReceiver.output(msg); Review comment: Thanks for suggestion. In the interest of time, can I punt this to a future PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache
[GitHub] [beam] boyuanzz commented on a change in pull request #11715: [BEAM-9977] Implement GrowableOffsetRangeTracker
boyuanzz commented on a change in pull request #11715: URL: https://github.com/apache/beam/pull/11715#discussion_r427654115 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.splittabledofn; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.io.range.OffsetRange; + +/** + * An {@link OffsetRangeTracker} for tracking a growable offset range. {@code Long.MAX_VALUE} is + * used as end range to indicate the possibility of infinity. + * + * An offset range is considered growable when the end offset could grow (or change) during + * execution time (e.g., Kafka topic partition offset, appended file, ...). + * + * The growable range is marked as done by claiming {@code Long.MAX_VALUE}. + */ +@Experimental(Kind.SPLITTABLE_DO_FN) +public class GrowableOffsetRangeTracker extends OffsetRangeTracker { + /** + * An interface that should be implemented to fetch estimated end offset of the range. + * + * {@code estimateRangeEnd} is called to give the end offset when {@code trySplit} or {@code + * getProgress} is invoked. The end offset is exclusive for the range. The estimated end is not + * necessary to increase monotonically as it will only be taken into computation when the estimate + * end is larger than the current position. When returning {@code Long.MAX_VALUE} as estimate, it + * means the largest possible position for the range is {@code Long.MAX_VALUE - 1}. If there is + * not an estimate yet, {@code Long.MIN_VALUE} should be returned, where estimated end will not + * effect progress and split. + * + * Having a good estimate is important for providing a good signal of progress and splitting at + * a proper position. + * + * If {@code estimate()} is expensive to call, please consider wrapping the implementation with + * {@code Suppliers.memoizeWithExpiration} as an optimization. + */ + @FunctionalInterface + public interface RangeEndEstimator { +long estimate(); + } + + private final RangeEndEstimator rangeEndEstimator; + + public GrowableOffsetRangeTracker(long start, RangeEndEstimator rangeEndEstimator) { +super(new OffsetRange(start, Long.MAX_VALUE)); +this.rangeEndEstimator = checkNotNull(rangeEndEstimator); + } + + @Override + public SplitResult trySplit(double fractionOfRemainder) { +// If current tracking range is no longer growable, split it as a normal range. +if (range.getTo() != Long.MAX_VALUE || range.getTo() == range.getFrom()) { + return super.trySplit(fractionOfRemainder); +} +// If current range has been done, there is no more space to split. +if (lastAttemptedOffset != null && lastAttemptedOffset == Long.MAX_VALUE) { + return null; +} +double cur = Review comment: Using `BigDecimal` in the latest revision. Thanks for your help! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on pull request #11753: [BEAM-9603] Add timer family support to FnApiDoFnRunner
boyuanzz commented on pull request #11753: URL: https://github.com/apache/beam/pull/11753#issuecomment-631136771 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on pull request #11642: Replace call to .checkpoint() in SDF direct runner to .try_claim(0)
boyuanzz commented on pull request #11642: URL: https://github.com/apache/beam/pull/11642#issuecomment-631136611 Hi Ashwin, do you want me to start to review now? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on pull request #11753: [BEAM-9603] Add timer family support to FnApiDoFnRunner
lukecwik commented on pull request #11753: URL: https://github.com/apache/beam/pull/11753#issuecomment-631133371 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem merged pull request #11736: Katas - Convert task description from HTML to Markdown
pabloem merged pull request #11736: URL: https://github.com/apache/beam/pull/11736 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11736: Katas - Convert task description from HTML to Markdown
pabloem commented on pull request #11736: URL: https://github.com/apache/beam/pull/11736#issuecomment-63117 alright RAT passing. Merging... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on a change in pull request #11701: [BEAM-9899] Fix some issues around storing schema `id` on user types
robertwb commented on a change in pull request #11701: URL: https://github.com/apache/beam/pull/11701#discussion_r427648180 ## File path: sdks/python/apache_beam/coders/row_coder.py ## @@ -134,19 +134,18 @@ def __init__(self, schema, components): def encode_to_stream(self, value, out, nested): nvals = len(self.schema.fields) self.SIZE_CODER.encode_to_stream(nvals, out, True) -attrs = [getattr(value, f.name) for f in self.schema.fields] Review comment: This forces the value to be an iterable, rather than just having the right fields, right? Are we sure we want to do that? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3
chamikaramj commented on pull request #11651: URL: https://github.com/apache/beam/pull/11651#issuecomment-631132405 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robertwb commented on a change in pull request #11740: [BEAM-8019] Prevent Dataflow from starting multiple containers for the same image
robertwb commented on a change in pull request #11740: URL: https://github.com/apache/beam/pull/11740#discussion_r427646588 ## File path: sdks/python/apache_beam/runners/dataflow/internal/apiclient.py ## @@ -310,15 +312,15 @@ def __init__( environment_payload = proto_utils.parse_Bytes( environment.payload, beam_runner_api_pb2.DockerPayload) container_image_url = environment_payload.container_image -if container_image_url == pipeline_sdk_container_image: - # This was already added +if container_image_url in already_added_containers: + # Do not add the pipeline environment again. Review comment: Perhaps also add a comment that currently dataflow stages all dependencies to all environments? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay commented on a change in pull request #11744: [BEAM-10007] Handle ValueProvider pipeline options in PortableRunner
aaltay commented on a change in pull request #11744: URL: https://github.com/apache/beam/pull/11744#discussion_r427645484 ## File path: sdks/python/apache_beam/runners/portability/portable_runner.py ## @@ -164,10 +165,19 @@ def add_runner_options(parser): all_options = self.options.get_all_options( add_extra_args_fn=add_runner_options, retain_unknown_options=self._retain_unknown_options) + # TODO: Define URNs for options. # convert int values: https://issues.apache.org/jira/browse/BEAM-5509 +def convert_pipeline_option_value(v): + if type(v) == int: Review comment: Interesting. For this PR, could you move that comment closer to the if type(v) == int line. > I can look into fixing that upstream so we can get rid of the special case here. If you have time, maybe file a bug. I do not think this is a very high priority for us. > I don't think it would be good to call str(v) on everything I agree. I was trying to understand why are we doing this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] aaltay commented on a change in pull request #11744: [BEAM-10007] Handle ValueProvider pipeline options in PortableRunner
aaltay commented on a change in pull request #11744: URL: https://github.com/apache/beam/pull/11744#discussion_r427645484 ## File path: sdks/python/apache_beam/runners/portability/portable_runner.py ## @@ -164,10 +165,19 @@ def add_runner_options(parser): all_options = self.options.get_all_options( add_extra_args_fn=add_runner_options, retain_unknown_options=self._retain_unknown_options) + # TODO: Define URNs for options. # convert int values: https://issues.apache.org/jira/browse/BEAM-5509 +def convert_pipeline_option_value(v): + if type(v) == int: Review comment: Interesting. For this PR, could you move that comment closer to the if type(v) == int line. > I can look into fixing that upstream so we can get rid of the special case here. If you have time, maybe file a bug. I do not think this is a very high priority for us. > I don't think it would be good to call str(v) on everything I agree. I was trying to understand why are we doing this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages
jaketf commented on a change in pull request #11596: URL: https://github.com/apache/beam/pull/11596#discussion_r427643888 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * This transform is optimized for dynamic splitting of message.list calls for large batches of + * historical data and assumes rather continuous stream of sendTimes. It will dynamically + * rebalance resources to handle "peak traffic times" but will waste resources if there are large + * durations (days) of the sendTime dimension without data. Review comment: note to self: remove reference to "dynamically rebalance" as this is not yet supported by dataflow runner. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tysonjh commented on a change in pull request #11566: [BEAM-9723] Add DLP integration transforms
tysonjh commented on a change in pull request #11566: URL: https://github.com/apache/beam/pull/11566#discussion_r427403454 ## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java ## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.auto.value.AutoValue; +import com.google.cloud.dlp.v2.DlpServiceClient; +import com.google.privacy.dlp.v2.ContentItem; +import com.google.privacy.dlp.v2.DeidentifyConfig; +import com.google.privacy.dlp.v2.DeidentifyContentRequest; +import com.google.privacy.dlp.v2.DeidentifyContentResponse; +import com.google.privacy.dlp.v2.FieldId; +import com.google.privacy.dlp.v2.InspectConfig; +import com.google.privacy.dlp.v2.ProjectName; +import com.google.privacy.dlp.v2.Table; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * A {@link PTransform} connecting to Cloud DLP and deidentifying text according to provided + * settings. The transform supports both CSV formatted input data and unstructured input. + * + * If the csvHeader property is set, csvDelimiter also should be, else the results will be + * incorrect. If csvHeader is not set, input is assumed to be unstructured. + * + * Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set. The + * situation is the same with deidentifyTemplateName and deidentifyConfig ({@link DeidentifyConfig}. + * + * Batch size defines how big are batches sent to DLP at once in bytes. Review comment: Are you agreeing that the comments should be moved to the methods, or that the comments are also useful here for the template inspection (I'm unfamiliar with what the 'inspect contents' and 'inspect template' actions)? ## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java ## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.auto.value.AutoValue; +import com.google.cloud.dlp.v2.DlpServiceClient; +import com.google.privacy.dlp.v2.ContentItem; +import com.google.privacy.dlp.v2.DeidentifyConfig; +import com.google.privacy.dlp.v2.DeidentifyContentRequest; +import com.google.privacy.dlp.v2.DeidentifyContentResponse; +import com.google.privacy.dlp.v2.FieldId; +import com.google.privacy.dlp.v2.InspectConfig; +import com.google.privacy.dlp.v2.ProjectName; +import com.google.privacy.dlp.v2.Table; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.value
[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages
jaketf commented on a change in pull request #11596: URL: https://github.com/apache/beam/pull/11596#discussion_r427643387 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * This transform is optimized for dynamic splitting of message.list calls for large batches of Review comment: ```suggestion * This transform is optimized for splitting of message.list calls for large batches of ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on pull request #11753: [BEAM-9603] Add timer family support to FnApiDoFnRunner
lukecwik commented on pull request #11753: URL: https://github.com/apache/beam/pull/11753#issuecomment-631126898 R: @boyuanzz This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on pull request #11753: [BEAM-9603] Add timer family support to FnApiDoFnRunner
lukecwik commented on pull request #11753: URL: https://github.com/apache/beam/pull/11753#issuecomment-631126805 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages
jaketf commented on a change in pull request #11596: URL: https://github.com/apache/beam/pull/11596#discussion_r427641350 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * This transform is optimized for dynamic splitting of message.list calls for large batches of + * historical data and assumes rather continuous stream of sendTimes. It will dynamically + * rebalance resources to handle "peak traffic times" but will waste resources if there are large + * durations (days) of the sendTime dimension without data. + * + * Implementation includes overhead for: 1. two api calls to determine the min/max sendTime of + * the HL7v2 store at invocation time. 2. initial splitting into non-overlapping time ranges + * (default daily) to achieve parallelization in separate messages.list calls. + * + * This will make more queries than necessary when used with very small data sets. (or very + * sparse data sets in the sendTime dimension). + * + * If you have large but sparse data (e.g. hours between consecutive message sendTimes) and + * know something about the time ranges where you have no data, consider using multiple instances + * of this transform specifying sendTime filters to omit the ranges where there is no data. Review comment: That's great to know! will remove this guidance as it will lead to unnecessary complexity. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages
jaketf commented on a change in pull request #11596: URL: https://github.com/apache/beam/pull/11596#discussion_r427640873 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * This transform is optimized for dynamic splitting of message.list calls for large batches of + * historical data and assumes rather continuous stream of sendTimes. It will dynamically + * rebalance resources to handle "peak traffic times" but will waste resources if there are large + * durations (days) of the sendTime dimension without data. + * + * Implementation includes overhead for: 1. two api calls to determine the min/max sendTime of + * the HL7v2 store at invocation time. 2. initial splitting into non-overlapping time ranges + * (default daily) to achieve parallelization in separate messages.list calls. Review comment: I originally included this for users who may try to benchmark this against tiny / sparse results set and be surprised why it is slow / making so many api calls. I see your point will remove. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] jaketf commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages
jaketf commented on a change in pull request #11596: URL: https://github.com/apache/beam/pull/11596#discussion_r427640111 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * This transform is optimized for dynamic splitting of message.list calls for large batches of + * historical data and assumes rather continuous stream of sendTimes. It will dynamically + * rebalance resources to handle "peak traffic times" but will waste resources if there are large + * durations (days) of the sendTime dimension without data. Review comment: correct. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages
lukecwik commented on a change in pull request #11596: URL: https://github.com/apache/beam/pull/11596#discussion_r427618071 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * This transform is optimized for dynamic splitting of message.list calls for large batches of Review comment: Consider using `{@code ...}` when referring to code and `{@link ...}` for things you can directly link against. ```suggestion * This transform is optimized for dynamic splitting of {@code message.list} calls for large batches of ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11596: [BEAM-9856] Optimization/hl7v2 io list messages
lukecwik commented on a change in pull request #11596: URL: https://github.com/apache/beam/pull/11596#discussion_r427616856 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * This transform is optimized for dynamic splitting of message.list calls for large batches of + * historical data and assumes rather continuous stream of sendTimes. It will dynamically + * rebalance resources to handle "peak traffic times" but will waste resources if there are large + * durations (days) of the sendTime dimension without data. Review comment: wouldn' this just be a small amount of waste since we would effectively get an empty response? ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * This transform is optimized for dynamic splitting of message.list calls for large batches of + * historical data and assumes rather continuous stream of sendTimes. It will dynamically + * rebalance resources to handle "peak traffic times" but will waste resources if there are large + * durations (days) of the sendTime dimension without data. + * + * Implementation includes overhead for: 1. two api calls to determine the min/max sendTime of Review comment: consider using `` and `` tags in the javadoc for your ordered list ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ## @@ -415,10 +423,29 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) } } - /** List HL7v2 messages in HL7v2 Stores with optional filter. */ + /** + * List HL7v2 messages in HL7v2 Stores with optional filter. + * + * This transform is optimized for dynamic splitting of message.list calls for large batches of + * historical data and assumes rather continuous stream of sendTimes. It will dynamically + * rebalance resources to handle "peak traffic times" but will waste resources if there are large + * durations (days) of the sendTime dimension without data. + * + * Implementation includes overhead for: 1. two api calls to determine the min/max sendTime of + * the HL7v2 store at invocation time. 2. initial splitting into non-overlapping time ranges + * (default daily) to achieve parallelization in separate messages.list calls. + * + * This will make more queries than necessary when used with very small data sets. (or very + * sparse data sets in the sendTime dimension). + * + * If you have large but sparse data (e.g. hours between consecutive message sendTimes) and + * know something about the time ranges where you have no data, consider using multiple instances + * of this transform specifying sendTime filters to omit the ranges where there is no data. + */ public static class ListHL7v2Messages extends PTransform> { -private final List hl7v2Stores; -private final String filter; +private final ValueProvider> hl7v2Stores; +private ValueProvider filter; +private Duration initialSplitDuration; Review comment: even if a member variable is null, it should still be final since it doesn't look like we mutate it locally. Same reason for other places I suggest to change this. ```suggestion private final ValueProvider filter; private final Duration initialSplitDuration; ``` ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java ## @@ -427,29 +454,75 @@ private Message fetchMessage(HealthcareApiClient client, String msgId) * @param filter the filter */ ListHL7v2Messages(ValueProvider> hl7v2Stores, ValueProvider filter) { - this.hl7v2Stores = hl7v2Stores.get(); - this.filter = filter.get(); + this.hl7v2Stores = hl7v2Stores; + this.filter = filter; +} + +/** + * Instantiates a new List hl 7 v 2 messages. + * + * @param hl7v2Stores the hl 7 v 2 stores + * @param filter the filter + * @param initialSplitDuration the initial split duration for sendTime dimension splits + */ +ListHL7v2Messages( +ValueProvider> hl7v2Stores, +ValueProvider filter, +Duration initialSplitDuration) { + this.hl7v2Stores = hl7v2Stores; + this.filter = filter; + this.initialSplitDuration = initial
[GitHub] [beam] pabloem commented on pull request #11736: Katas - Convert task description from HTML to Markdown
pabloem commented on pull request #11736: URL: https://github.com/apache/beam/pull/11736#issuecomment-631112295 I don't know why the SQL precommit is running : ) All i care about is the RAT precommit - which should pass... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11736: Katas - Convert task description from HTML to Markdown
pabloem commented on pull request #11736: URL: https://github.com/apache/beam/pull/11736#issuecomment-63401 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3
chamikaramj commented on pull request #11651: URL: https://github.com/apache/beam/pull/11651#issuecomment-631105651 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3
chamikaramj commented on pull request #11651: URL: https://github.com/apache/beam/pull/11651#issuecomment-631105532 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] veblush commented on a change in pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3
veblush commented on a change in pull request #11651: URL: https://github.com/apache/beam/pull/11651#discussion_r427623777 ## File path: buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy ## @@ -489,6 +490,7 @@ class BeamModulePlugin implements Plugin { grpc_protobuf : "io.grpc:grpc-protobuf:$grpc_version", grpc_protobuf_lite : "io.grpc:grpc-protobuf-lite:$grpc_version", grpc_netty : "io.grpc:grpc-netty:$grpc_version", +grpc_netty_shaded : "io.grpc:grpc-netty-shaded:$grpc_version", Review comment: Note that current beam already has it from [gax-grpc](https://mvnrepository.com/artifact/com.google.api/gax-grpc/1.54.0) transitively. This can make sure that all these components are working with the same version. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] y1chi opened a new pull request #11753: [BEAM-9603] Add timer family support to FnApiDoFnRunner
y1chi opened a new pull request #11753: URL: https://github.com/apache/beam/pull/11753 Implemented the missing pieces in FnApiDoFnRunner to support timer family. Also refactored a few function signatures to avoid confusion. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://
[GitHub] [beam] amaliujia merged pull request #11737: [BEAM-9984] Support BIT_OR aggregation function in Beam SQL
amaliujia merged pull request #11737: URL: https://github.com/apache/beam/pull/11737 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem commented on pull request #11339: [BEAM-9468] Fhir io
pabloem commented on pull request #11339: URL: https://github.com/apache/beam/pull/11339#issuecomment-631081683 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj merged pull request #11740: [BEAM-8019] Prevent Dataflow from starting multiple containers for the same image
chamikaramj merged pull request #11740: URL: https://github.com/apache/beam/pull/11740 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #11744: [BEAM-10007] Handle ValueProvider pipeline options in PortableRunner
TheNeuralBit commented on a change in pull request #11744: URL: https://github.com/apache/beam/pull/11744#discussion_r427592587 ## File path: sdks/python/apache_beam/runners/portability/portable_runner.py ## @@ -164,10 +165,19 @@ def add_runner_options(parser): all_options = self.options.get_all_options( add_extra_args_fn=add_runner_options, retain_unknown_options=self._retain_unknown_options) + # TODO: Define URNs for options. # convert int values: https://issues.apache.org/jira/browse/BEAM-5509 +def convert_pipeline_option_value(v): + if type(v) == int: Review comment: Oh the relevant jira is linked right there. It seems that this is really a workaround for a bug in json_format where ints and floats aren't treated differently. I can look into fixing that upstream so we can get rid of the special case here. I don't think it would be good to call str(v) on everything This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11740: [BEAM-8019] Prevent Dataflow from starting multiple containers for the same image
chamikaramj commented on pull request #11740: URL: https://github.com/apache/beam/pull/11740#issuecomment-631075979 Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #11744: [BEAM-10007] Handle ValueProvider pipeline options in PortableRunner
TheNeuralBit commented on a change in pull request #11744: URL: https://github.com/apache/beam/pull/11744#discussion_r427586062 ## File path: sdks/python/apache_beam/runners/portability/portable_runner.py ## @@ -164,10 +165,19 @@ def add_runner_options(parser): all_options = self.options.get_all_options( add_extra_args_fn=add_runner_options, retain_unknown_options=self._retain_unknown_options) + # TODO: Define URNs for options. # convert int values: https://issues.apache.org/jira/browse/BEAM-5509 +def convert_pipeline_option_value(v): + if type(v) == int: Review comment: Good question. I was just maintaining the status quo and adding a special case for `ValueProvider`. Let me see if I can dig up why the str(v) is there for ints This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tvalentyn commented on pull request #11707: [BEAM-9810] Add a Tox (precommit) suite for Python 3.8
tvalentyn commented on pull request #11707: URL: https://github.com/apache/beam/pull/11707#issuecomment-631067325 The code change LGTM, thank you. We need to address BEAM-9994 before we can merge this. Would you have time to investigate & recommend a solution for that issue, @kamilwu ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] veblush commented on a change in pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3
veblush commented on a change in pull request #11651: URL: https://github.com/apache/beam/pull/11651#discussion_r427581972 ## File path: sdks/java/io/google-cloud-platform/build.gradle ## @@ -56,11 +56,13 @@ dependencies { compile library.java.google_http_client compile library.java.google_http_client_jackson2 compile library.java.grpc_all + compile library.java.grpc_alts compile library.java.grpc_auth compile library.java.grpc_core compile library.java.grpc_context compile library.java.grpc_grpclb compile library.java.grpc_netty + compile library.java.grpc_netty_shaded Review comment: Same as above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] veblush commented on a change in pull request #11651: [BEAM-8889] Upgrades gcsio to 2.1.3
veblush commented on a change in pull request #11651: URL: https://github.com/apache/beam/pull/11651#discussion_r427581456 ## File path: buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy ## @@ -489,6 +490,7 @@ class BeamModulePlugin implements Plugin { grpc_protobuf : "io.grpc:grpc-protobuf:$grpc_version", grpc_protobuf_lite : "io.grpc:grpc-protobuf-lite:$grpc_version", grpc_netty : "io.grpc:grpc-netty:$grpc_version", +grpc_netty_shaded : "io.grpc:grpc-netty-shaded:$grpc_version", Review comment: This comes from grpc-alts. (grpc-alts is for directpath) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org