[GitHub] [beam] pawelpasterz commented on a change in pull request #11956: [BEAM-8133] Publishing results of Nexmark tests to InfluxDB
pawelpasterz commented on a change in pull request #11956: URL: https://github.com/apache/beam/pull/11956#discussion_r445336567 ## File path: sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java ## @@ -142,21 +148,27 @@ void runAll(String[] args) throws IOException { saveSummary(null, configurations, actual, baseline, start, options); } - if (options.getExportSummaryToBigQuery()) { -ImmutableMap schema = -ImmutableMap.builder() -.put("timestamp", "timestamp") -.put("runtimeSec", "float") -.put("eventsPerSec", "float") -.put("numResults", "integer") -.build(); + final ImmutableMap schema = + ImmutableMap.builder() + .put("timestamp", "timestamp") + .put("runtimeSec", "float") + .put("eventsPerSec", "float") + .put("numResults", "integer") + .build(); + if (options.getExportSummaryToBigQuery()) { savePerfsToBigQuery( BigQueryResultsPublisher.create(options.getBigQueryDataset(), schema), options, actual, start); } + + if (options.getExportSummaryToInfluxDB()) { +final long timestamp = start.getMillis() / 1000; // seconds Review comment: The default precision is nanoseconds. In case of nexmark results we changed it and use seconds instead ``` return new HttpPost( settings.host + "/write?db=" + settings.database + "&" + retentionPolicy + "&precision=s"); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] piotr-szuberski commented on pull request #12023: [BEAM-10135] Add Python wrapper for Jdbc Write external transform
piotr-szuberski commented on pull request #12023: URL: https://github.com/apache/beam/pull/12023#issuecomment-649254410 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pawelpasterz commented on a change in pull request #11956: [BEAM-8133] Publishing results of Nexmark tests to InfluxDB
pawelpasterz commented on a change in pull request #11956: URL: https://github.com/apache/beam/pull/11956#discussion_r445331560 ## File path: sdks/java/testing/test-utils/src/main/java/org/apache/beam/sdk/testutils/publishing/InfluxDBPublisher.java ## @@ -66,30 +125,43 @@ private static void publish( builder.setDefaultCredentialsProvider(provider); } -final HttpPost postRequest = new HttpPost(settings.host + "/write?db=" + settings.database); +return builder; + } -final StringBuilder metricBuilder = new StringBuilder(); -results.stream() Review comment: Hm...sure we can. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] rmannibucau commented on pull request #11886: [BEAM-8647] delete .mailmap
rmannibucau commented on pull request #11886: URL: https://github.com/apache/beam/pull/11886#issuecomment-649238480 @kennknowles the argument that it is public for github so it can be made public in the git repo is wrong, when you have a github account you ack or not that and you can request to drop it due to GPDR so this file should be regenerated each time github changes anything (I assume once a day would work). Also, project must request authorization to use these data there and track ack+enable to change it to be legal. AFAIK none of both is done (right?) so this workflow does not work and can trivially be replaced by a github request - with a falling back on "unknown" or anything you like when the email is no more available- companies can request github to become GPDR compliant which means data can be missing. You can also reverse the pattern and just list duplicates instead and handle the name in a case insensitive manner maybe? This reduces the number of entries a lot AFAIK. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] rezarokni commented on pull request #11929: [BEAM-10201] Add deadletter support to JsonToRow
rezarokni commented on pull request #11929: URL: https://github.com/apache/beam/pull/11929#issuecomment-649232286 @reuvenlax nudge :-) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] rafi-kamal opened a new pull request #12089: [BEAM-10283] Add new overloads of withKeyRanges and withRowFilter met…
rafi-kamal opened a new pull request #12089: URL: https://github.com/apache/beam/pull/12089 …hods that take ValueProvider as a parameter. **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org
[GitHub] [beam] piotr-szuberski removed a comment on pull request #12023: [BEAM-10135] Add Python wrapper for Jdbc Write external transform
piotr-szuberski removed a comment on pull request #12023: URL: https://github.com/apache/beam/pull/12023#issuecomment-649225729 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] piotr-szuberski removed a comment on pull request #12023: [BEAM-10135] Add Python wrapper for Jdbc Write external transform
piotr-szuberski removed a comment on pull request #12023: URL: https://github.com/apache/beam/pull/12023#issuecomment-648777087 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] piotr-szuberski commented on pull request #12023: [BEAM-10135] Add Python wrapper for Jdbc Write external transform
piotr-szuberski commented on pull request #12023: URL: https://github.com/apache/beam/pull/12023#issuecomment-649225729 Run Python PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kennknowles commented on pull request #12020: [BEAM-9066] Add javax.annotation-api dependency.
kennknowles commented on pull request #12020: URL: https://github.com/apache/beam/pull/12020#issuecomment-649188578 TBH I am not sure. I believe that GPL + classpath exception is standard and probably best. It is what allows all Java users to make Java programs that are not GPL. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robinyqiu commented on pull request #12054: [BEAM-10219] Support ZetaSQL TIME functions in BeamSQL
robinyqiu commented on pull request #12054: URL: https://github.com/apache/beam/pull/12054#issuecomment-649183345 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robinyqiu commented on a change in pull request #12054: [BEAM-10219] Support ZetaSQL TIME functions in BeamSQL
robinyqiu commented on a change in pull request #12054: URL: https://github.com/apache/beam/pull/12054#discussion_r445279103 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java ## @@ -27,4 +28,7 @@ private SqlTypes() {} /** Beam LogicalType corresponding to ZetaSQL/CalciteSQL DATE type. */ public static final LogicalType DATE = new Date(); + + /** Beam LogicalType corresponding to ZetaSQL/CalciteSQL TIME type. */ + public static final LogicalType TIME = new Time(); Review comment: Now the internal build passes. Thanks for the question. Conversion is lossless. Actually it is done by calling ZetaSQL code: `CivilTimeEncoder.decodePacked64TimeNanosAsJavaTime()` and `encodePacked64TimeNanos()`. See conversion code in `ZetaSqlBeamTranslationUtils.java`. I looked at the code and it seems to be purely bit manipulation, so I guess it's not very expensive. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] udim commented on a change in pull request #12088: Add output typehints to GroupIntoBatches
udim commented on a change in pull request #12088: URL: https://github.com/apache/beam/pull/12088#discussion_r445278906 ## File path: sdks/python/apache_beam/transforms/util.py ## @@ -741,6 +741,7 @@ def WithKeys(pcoll, k): @experimental() @typehints.with_input_types(Tuple[K, V]) +@typehints.with_output_types(Tuple[K, List[V]]) Review comment: cc: @lukecwik This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] udim commented on a change in pull request #12088: Add output typehints to GroupIntoBatches
udim commented on a change in pull request #12088: URL: https://github.com/apache/beam/pull/12088#discussion_r445278714 ## File path: sdks/python/apache_beam/transforms/util.py ## @@ -741,6 +741,7 @@ def WithKeys(pcoll, k): @experimental() @typehints.with_input_types(Tuple[K, V]) +@typehints.with_output_types(Tuple[K, List[V]]) Review comment: I was thinking it might be Iterable rather than List, so I looked at the code and it doesn't seem to group by key at all. I modified a test from `GroupIntoBatchesTest` to print to elements: ```py def test_in_global_window(self): with TestPipeline() as pipeline: def print_elements(e): print(e) return e collection = pipeline \ | beam.Create(GroupIntoBatchesTest._create_test_data()) \ | util.GroupIntoBatches(GroupIntoBatchesTest.BATCH_SIZE) \ | beam.Map(print_elements) ``` Printout: ```py [('key', 'Einstein'), ('key', 'Darwin'), ('key', 'Copernicus'), ('key', 'Pasteur'), ('key', 'Curie')] [('key', 'Faraday'), ('key', 'Newton'), ('key', 'Bohr'), ('key', 'Galilei'), ('key', 'Maxwell')] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] udim commented on a change in pull request #12009: [BEAM-10258] Support type hint annotations on PTransform's expand()
udim commented on a change in pull request #12009: URL: https://github.com/apache/beam/pull/12009#discussion_r445246250 ## File path: sdks/python/apache_beam/typehints/typed_pipeline_test_py3.py ## @@ -40,6 +40,14 @@ def process(self, element: int) -> typehints.Tuple[str]: with self.assertRaisesRegex(typehints.TypeCheckError, r'requires.*int.*got.*str'): _ = ['a', 'b', 'c'] | beam.ParDo(MyDoFn()) + def test_pardo_dofn(self): Review comment: Did you mean to leave this test here? It looks like a copy of the one in AnnotationsTest. ## File path: sdks/python/apache_beam/typehints/typehints_test_py3.py ## @@ -46,11 +51,61 @@ class MyDoFn(DoFn): def process(self, element: int) -> Iterable[str]: pass -print(MyDoFn().get_type_hints()) th = MyDoFn().get_type_hints() self.assertEqual(th.input_types, ((int, ), {})) self.assertEqual(th.output_types, ((str, ), {})) +class TestPTransformAnnotations(unittest.TestCase): + def test_pep484_annotations(self): +class MyPTransform(PTransform): + def expand(self, pcoll: PCollection[int]) -> PCollection[str]: +return pcoll | Map(lambda num: str(num)) + +th = MyPTransform().get_type_hints() +self.assertEqual(th.input_types, ((int, ), {})) +self.assertEqual(th.output_types, ((str, ), {})) + + def test_annotations_without_pcollection_wrapper(self): +class MyPTransform(PTransform): + def expand(self, pcoll: int) -> str: +return pcoll | Map(lambda num: str(num)) + +with self.assertRaises(TypeCheckError) as error: + _th = MyPTransform().get_type_hints() + +self.assertEqual(str(error.exception), 'An input typehint to a PTransform must be a single (or nested) type ' + 'wrapped by a PCollection.') + + def test_annotations_without_internal_type(self): +class MyPTransform(PTransform): + def expand(self, pcoll: PCollection) -> PCollection: Review comment: This is valid. The type hint should convert to `Any`. Quoting from https://docs.python.org/3/library/typing.html: > Using a generic class without specifying type parameters assumes Any for each position. ## File path: sdks/python/apache_beam/typehints/typed_pipeline_test_py3.py ## @@ -257,6 +265,65 @@ def fn2(element: int) -> int: result = [1, 2, 3] | beam.FlatMap(fn) | beam.Map(fn2) self.assertCountEqual([4, 6], result) + def test_typed_ptransform_with_no_error(self): +class StrToInt(beam.PTransform): + def expand(self, pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[int]: +return pcoll | beam.Map(lambda x: int(x)) + +class IntToStr(beam.PTransform): + def expand(self, pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]: +return pcoll | beam.Map(lambda x: str(x)) + +try: + _ = ['1', '2', '3'] | StrToInt() | IntToStr() +except Exception: + self.fail('An unexpected error was raised during a pipeline with correct typehints.') + + def test_typed_ptransform_with_bad_typehints(self): +class StrToInt(beam.PTransform): + def expand(self, pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[int]: +return pcoll | beam.Map(lambda x: int(x)) + +class IntToStr(beam.PTransform): + def expand(self, pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[str]: +return pcoll | beam.Map(lambda x: str(x)) + +with self.assertRaises(typehints.TypeCheckError) as error: + # raises error because of mismatched typehints between StrToInt and IntToStr + _ = ['1', '2', '3'] | StrToInt() | IntToStr() + +self.assertTrue("Input type hint violation at IntToStr: expected , got " in str(error.exception)) + + def test_typed_ptransform_with_bad_input(self): +class StrToInt(beam.PTransform): + def expand(self, pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[int]: +return pcoll | beam.Map(lambda x: int(x)) + +class IntToStr(beam.PTransform): + def expand(self, pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]: +return pcoll | beam.Map(lambda x: str(x)) + +with self.assertRaises(typehints.TypeCheckError) as error: + # Feed integers to a PTransform that expects strings + _ = [1, 2, 3] | StrToInt() | IntToStr() + +self.assertTrue("Input type hint violation at StrToInt: expected , got " in str(error.exception)) Review comment: Please use `with self.assertRaisesRegex(..)` above instead of separately checking the exception text. ## File path: sdks/python/apache_beam/transforms/ptransform.py ## @@ -364,6 +366,15 @@ def default_label(self): # type: () -> str return self.__class__.__name__ + def default_type_hints(self): +fn_type_hints = IOTypeHints.from_callable(self.expand) +if fn_type_hints is no
[GitHub] [beam] aaltay opened a new pull request #12088: Add output typehints to GroupIntoBatches
aaltay opened a new pull request #12088: URL: https://github.com/apache/beam/pull/12088 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/ic
[GitHub] [beam] TheNeuralBit commented on pull request #12067: [BEAM-10308] Make component ID assignments consistent across PipelineContext instances
TheNeuralBit commented on pull request #12067: URL: https://github.com/apache/beam/pull/12067#issuecomment-649168647 I have a POC working that solves the problem by fixing all of the component IDs in the external transforms in `to_runner_api()`. It feels pretty error-prone, but it doesn't maintain any global state, and keeps all of the special logic within ExternalTransform. I'll clean it up and push it here tomorrow morning. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ihji commented on pull request #12086: [BEAM-10322] allow only single assignment to producing stages by pcol…
ihji commented on pull request #12086: URL: https://github.com/apache/beam/pull/12086#issuecomment-649161049 R: @pablom CC: @robertwb This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ihji commented on a change in pull request #12060: [BEAM-10218] Add FnApiRunner to cross-language validate runner test
ihji commented on a change in pull request #12060: URL: https://github.com/apache/beam/pull/12060#discussion_r445260087 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ## @@ -375,6 +375,8 @@ def get_all_side_inputs(): for o in transform.outputs.values(): if o in s.side_inputs(): continue + if o in producing_stages_by_pcoll: Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ihji commented on pull request #12085: [BEAM-10318] fix uninitialized grpc_server in FnApiRunner
ihji commented on pull request #12085: URL: https://github.com/apache/beam/pull/12085#issuecomment-649161136 R: @chamikaramj This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ihji commented on pull request #12087: [BEAM-10321] retain environments in flatten for preventing it from be…
ihji commented on pull request #12087: URL: https://github.com/apache/beam/pull/12087#issuecomment-649160879 R: @pabloem CC: @robertwb This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ihji opened a new pull request #12087: [BEAM-10321] retain environments in flatten for preventing it from be…
ihji opened a new pull request #12087: URL: https://github.com/apache/beam/pull/12087 …ing fused into stages running in foreign language SDKs Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](ht
[GitHub] [beam] udim commented on a change in pull request #12076: [BEAM-10143] Add test verifying external windowing
udim commented on a change in pull request #12076: URL: https://github.com/apache/beam/pull/12076#discussion_r445259660 ## File path: sdks/python/apache_beam/transforms/sql_test.py ## @@ -157,6 +157,21 @@ def test_zetasql_generate_data(self): dialect="zetasql") assert_that(out, equal_to([(1, "foo", 3.14)])) + def test_windowing_before_sql(self): +with TestPipeline() as p: + windowed = ( + p | beam.Create([ + SimpleRow(5, "foo", 1.), + SimpleRow(15, "bar", 2.), + SimpleRow(25, "baz", 3.) + ]) + | beam.Map(lambda v: beam.window.TimestampedValue(v, v.id)). + with_output_types(SimpleRow) Review comment: https://issues.apache.org/jira/browse/BEAM-10323 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ihji opened a new pull request #12086: [BEAM-10322] allow only single assignment to producing stages by pcol…
ihji opened a new pull request #12086: URL: https://github.com/apache/beam/pull/12086 …lection map Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommi
[GitHub] [beam] ihji opened a new pull request #12085: [BEAM-10318] fix uninitialized grpc_server in FnApiRunner
ihji opened a new pull request #12085: URL: https://github.com/apache/beam/pull/12085 Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastC
[GitHub] [beam] udim commented on a change in pull request #12076: [BEAM-10143] Add test verifying external windowing
udim commented on a change in pull request #12076: URL: https://github.com/apache/beam/pull/12076#discussion_r445257811 ## File path: sdks/python/apache_beam/transforms/sql_test.py ## @@ -157,6 +157,21 @@ def test_zetasql_generate_data(self): dialect="zetasql") assert_that(out, equal_to([(1, "foo", 3.14)])) + def test_windowing_before_sql(self): +with TestPipeline() as p: + windowed = ( + p | beam.Create([ + SimpleRow(5, "foo", 1.), + SimpleRow(15, "bar", 2.), + SimpleRow(25, "baz", 3.) + ]) + | beam.Map(lambda v: beam.window.TimestampedValue(v, v.id)). + with_output_types(SimpleRow) Review comment: I'm not sure there is a more elegant way. Even if you turn that lambda into a function the output type decorator and actual return value will disagree: ```py def test_timestamped_value(self): @beam.typehints.with_input_types(int) @beam.typehints.with_output_types(int) def timestamped(e): return beam.window.TimestampedValue(e, 0) with TestPipeline() as p: pcoll = p | beam.Create([1, 2, 3]) | beam.Map(timestamped) self.assertEqual(int, pcoll.element_type) ``` I prefer the above style to inlining `.with_output_types` so it's clear I'm not making an exception. This mismatch is normal in Beam: a DoFn.process()'s return type and type hint disagree as well. We should probably add functionality to support annotating the above function like this: ```py def timestamped(e: int) -> beam.window.TimestampedValue[int]: return beam.window.TimestampedValue(e, 0) ``` And `pcoll.element_type` will be interpreted as `int`. Same for `WindowedValues`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #12070: Add some metrics for BigQueryStorageStreamReader
chamikaramj commented on pull request #12070: URL: https://github.com/apache/beam/pull/12070#issuecomment-649157020 Please add a JIRA This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on a change in pull request #12070: Add some metrics for BigQueryStorageStreamReader
chamikaramj commented on a change in pull request #12070: URL: https://github.com/apache/beam/pull/12070#discussion_r445255937 ## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java ## @@ -219,7 +232,15 @@ private synchronized boolean readNextRecord() throws IOException { } fractionConsumedFromPreviousResponse = fractionConsumedFromCurrentResponse; -ReadRowsResponse currentResponse = responseIterator.next(); +ReadRowsResponse currentResponse; +Stopwatch stopwatch = Stopwatch.createStarted(); Review comment: Do you think these operations can have a performance impact ? If so will it make sense to add an option to disable/enable these ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on a change in pull request #12060: [BEAM-10218] Add FnApiRunner to cross-language validate runner test
chamikaramj commented on a change in pull request #12060: URL: https://github.com/apache/beam/pull/12060#discussion_r445254132 ## File path: sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py ## @@ -375,6 +375,8 @@ def get_all_side_inputs(): for o in transform.outputs.values(): if o in s.side_inputs(): continue + if o in producing_stages_by_pcoll: Review comment: Could you send the runner changes as separate PR(s) ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11850: [BEAM-1438] Allow 0 shards on WriteFiles streaming
chamikaramj commented on pull request #11850: URL: https://github.com/apache/beam/pull/11850#issuecomment-649153502 Have were tried out Dataflow streaming pipelines to make sure that this works as expected ? Probably we should add an integration/performance test for this case as well given that probably many of our users will end up using this path instead of specifying a value. cc: @dpmills @slavachernyak @reuvenlax This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ajamato commented on pull request #12084: [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID
ajamato commented on pull request #12084: URL: https://github.com/apache/beam/pull/12084#issuecomment-649150134 @pabloem @chamikaramj This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ajamato opened a new pull request #12084: [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID
ajamato opened a new pull request #12084: URL: https://github.com/apache/beam/pull/12084 [BEAM-10317] Python - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/
[GitHub] [beam] chamikaramj merged pull request #11834: [BEAM-10117] Correct erroneous Job Failed message
chamikaramj merged pull request #11834: URL: https://github.com/apache/beam/pull/11834 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on pull request #11834: [BEAM-10117] Correct erroneous Job Failed message
chamikaramj commented on pull request #11834: URL: https://github.com/apache/beam/pull/11834#issuecomment-649147313 LGTM. Thanks. Sorry about the delay. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ajamato commented on pull request #12083: [BEAM-10317] Java - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID
ajamato commented on pull request #12083: URL: https://github.com/apache/beam/pull/12083#issuecomment-649146663 R: @pabloem @chamikaramj This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ajamato opened a new pull request #12083: [BEAM-10317] Java - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID
ajamato opened a new pull request #12083: URL: https://github.com/apache/beam/pull/12083 [BEAM-10317] Java - Update BigQueryIO to tag BigQuery Jobs with the Dataflow Job ID Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/la
[GitHub] [beam] chamikaramj commented on pull request #11453: Don't use protobuf toString for BQ storage API protos
chamikaramj commented on pull request #11453: URL: https://github.com/apache/beam/pull/11453#issuecomment-649146270 Ping ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] apilloud commented on a change in pull request #12054: [BEAM-10219] Support ZetaSQL TIME functions in BeamSQL
apilloud commented on a change in pull request #12054: URL: https://github.com/apache/beam/pull/12054#discussion_r445244602 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/SqlTypes.java ## @@ -27,4 +28,7 @@ private SqlTypes() {} /** Beam LogicalType corresponding to ZetaSQL/CalciteSQL DATE type. */ public static final LogicalType DATE = new Date(); + + /** Beam LogicalType corresponding to ZetaSQL/CalciteSQL TIME type. */ + public static final LogicalType TIME = new Time(); Review comment: This is the most important line of this CL for correctness, wanted to get attention on it early. ZetaSQL uses a very specific format for this, which roughly matches the struct in LocalTime: https://github.com/google/zetasql/blob/79adcd0fe227173e68ed7aa88f580a691ebe82c2/zetasql/public/civil_time.h#L85 It appears the Long base type is an offset in Nanoseconds. Is that conversion lossless? Is it expensive? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robinyqiu commented on pull request #12054: [BEAM-10219] Support ZetaSQL TIME functions in BeamSQL
robinyqiu commented on pull request #12054: URL: https://github.com/apache/beam/pull/12054#issuecomment-649145158 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] udim commented on pull request #12009: [BEAM-10258] Support type hint annotations on PTransform's expand()
udim commented on pull request #12009: URL: https://github.com/apache/beam/pull/12009#issuecomment-649145031 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r445242837 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -198,6 +213,154 @@ *... * } * + * Read from Kafka as a {@link DoFn} + * + * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link + * KafkaSourceDescription} as input and outputs a PCollection of {@link KafkaRecord}. The core + * implementation is based on {@code SplittableDoFn}. For more details about the concept of {@code + * SplittableDoFn}, please refer to the https://beam.apache.org/blog/splittable-do-fn/";>blog post and https://s.apache.org/beam-fn-api";>design doc. The major difference from {@link + * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the + * pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadAll}. + * + * Common Kafka Consumer Configurations + * + * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * + * {@link ReadAll#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * {@link ReadAll#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * {@link ReadAll#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * {@link ReadAll#getKeyCoder()} is the same as {@link KafkaIO.Read#getKeyCoder()}. + * {@link ReadAll#getValueCoder()} is the same as {@link KafkaIO.Read#getValueCoder()}. + * {@link ReadAll#getKeyDeserializerProvider()} is the same as {@link + * KafkaIO.Read#getKeyDeserializerProvider()}. + * {@link ReadAll#getValueDeserializerProvider()} is the same as {@link + * KafkaIO.Read#getValueDeserializerProvider()}. + * {@link ReadAll#isCommitOffsetEnabled()} means the same as {@link + * KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}. + * + * + * For example, to create a basic {@link ReadAll} transform: + * + * {@code + * pipeline + * .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1))) + * .apply(KafkaIO.readAll() + * .withBootstrapServers("broker_1:9092,broker_2:9092") + * .withKeyDeserializer(LongDeserializer.class). + * .withValueDeserializer(StringDeserializer.class)); + * + * Note that the {@code bootstrapServers} can also be populated from {@link KafkaSourceDescription}: + * pipeline + * .apply(Create.of( + *KafkaSourceDescription.of( + * new TopicPartition("topic", 1), + * null, + * null, + * ImmutableList.of("broker_1:9092", "broker_2:9092")) + * .apply(KafkaIO.readAll() + * .withKeyDeserializer(LongDeserializer.class). + * .withValueDeserializer(StringDeserializer.class)); + * + * } + * + * Configurations of {@link ReadAll} + * + * Except configurations of Kafka Consumer, there are some other configurations which are related + * to processing records. + * + * {@link ReadAll#commitOffsets()} enables committing offset after processing the record. Note + * that if {@code isolation.level} is set to "read_committed" or {@link + * ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG} is set in the consumer config, the {@link + * ReadAll#commitOffsets()} will be ignored. + * + * {@link ReadAll#withExtractOutputTimestampFn(SerializableFunction)} asks for a function which + * takes a {@link KafkaRecord} as input and outputs outputTimestamp. This function is used to + * produce output timestamp per {@link KafkaRecord}. There are three built-in types: {@link + * ReadAll#withProcessingTime()}, {@link ReadAll#withCreateTime()} and {@link + * ReadAll#withLogAppendTime()}. + * + * For example, to create a {@link ReadAll} with these configurations: + * + * {@code + * pipeline + * .apply(Create.of( + *KafkaSourceDescription.of( + * new TopicPartition("topic", 1), + * null, + * null, + * ImmutableList.of("broker_1:9092", "broker_2:9092")) + * .apply(KafkaIO.readAll() + * .withKeyDeserializer(LongDeserializer.class). + * .withValueDeserializer(StringDeserializer.class) + * .withProcessingTime() + * .commitOffsets()); + * + * } + * + * Read from {@link KafkaSourceDescription} + * + * {@link ReadFromKafkaDoFn} implements the logic of reading from Kafka. The element is a {@link + * KafkaSourceDescription}, and the restriction is an {@link OffsetRange} which represents record + * offset. A {@link GrowableOffsetRangeTracker} is used to track an {@link OffsetRange} ended with + * {@code Long.MAX_VALUE}. For a finite range, a {@link OffsetRangeTracker} is creat
[GitHub] [beam] pabloem commented on pull request #12082: Standardizing BigQuery job names in Beam Python and Java SDKs
pabloem commented on pull request #12082: URL: https://github.com/apache/beam/pull/12082#issuecomment-649140269 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r445236767 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldName; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.kafka.common.TopicPartition; +import org.joda.time.Instant; + +/** + * An AutoValue object which represents a Kafka source description. Note that this object should be + * encoded/decoded with equivalent {@link Schema} as a {@link Row} when crossing the wire. + */ +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class KafkaSourceDescription implements Serializable { Review comment: It seems like `Descriptor` makes more sense. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r445235681 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -1051,33 +1261,341 @@ public void populateDisplayData(DisplayData.Builder builder) { } } - - - private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class); - /** - * Returns a new config map which is merge of current config and updates. Verifies the updates do - * not includes ignored properties. + * A {@link PTransform} to read from Kafka. See {@link KafkaIO} for more information on usage and + * configuration. */ - private static Map updateKafkaProperties( - Map currentConfig, - Map ignoredProperties, - Map updates) { + @Experimental(Kind.PORTABILITY) + @AutoValue + public abstract static class ReadAll + extends PTransform, PCollection>> { + +private static final Logger LOG = LoggerFactory.getLogger(ReadAll.class); + +abstract Map getConsumerConfig(); + +@Nullable +abstract Map getOffsetConsumerConfig(); + +@Nullable +abstract DeserializerProvider getKeyDeserializerProvider(); + +@Nullable +abstract DeserializerProvider getValueDeserializerProvider(); + +@Nullable +abstract Coder getKeyCoder(); + +@Nullable +abstract Coder getValueCoder(); + +abstract SerializableFunction, Consumer> +getConsumerFactoryFn(); + +@Nullable +abstract SerializableFunction, Instant> getExtractOutputTimestampFn(); + +@Nullable +abstract SerializableFunction> +getCreateWatermarkEstimatorFn(); + +abstract boolean isCommitOffsetEnabled(); + +@Nullable +abstract TimestampPolicyFactory getTimestampPolicyFactory(); + +abstract ReadAll.Builder toBuilder(); + +@AutoValue.Builder +abstract static class Builder { + abstract ReadAll.Builder setConsumerConfig(Map config); + + abstract ReadAll.Builder setOffsetConsumerConfig( + Map offsetConsumerConfig); + + abstract ReadAll.Builder setConsumerFactoryFn( + SerializableFunction, Consumer> consumerFactoryFn); + + abstract ReadAll.Builder setKeyDeserializerProvider( + DeserializerProvider deserializerProvider); + + abstract ReadAll.Builder setValueDeserializerProvider( + DeserializerProvider deserializerProvider); + + abstract ReadAll.Builder setKeyCoder(Coder keyCoder); + + abstract ReadAll.Builder setValueCoder(Coder valueCoder); + + abstract ReadAll.Builder setExtractOutputTimestampFn( + SerializableFunction, Instant> fn); + + abstract ReadAll.Builder setCreateWatermarkEstimatorFn( + SerializableFunction> fn); + + abstract ReadAll.Builder setCommitOffsetEnabled(boolean commitOffsetEnabled); + + abstract ReadAll.Builder setTimestampPolicyFactory(TimestampPolicyFactory policy); + + abstract ReadAll build(); +} -for (String key : updates.keySet()) { +public static ReadAll read() { + return new AutoValue_KafkaIO_ReadAll.Builder() + .setConsumerFactoryFn(KafkaIOUtils.KAFKA_CONSUMER_FACTORY_FN) + .setConsumerConfig(KafkaIOUtils.DEFAULT_CONSUMER_PROPERTIES) + .setCommitOffsetEnabled(false) + .build() + .withProcessingTime() + .withMonotonicallyIncreasingWatermarkEstimator(); +} + +// Note that if the bootstrapServers is set here but also populated with the element, the +// element +// will override the bootstrapServers from the config. +public ReadAll withBootstrapServers(String bootstrapServers) { + return withConsumerConfigUpdates( + ImmutableMap.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)); +} + +public ReadAll withKeyDeserializerProvider(DeserializerProvider deserializerProvider) { + return toBuilder().setKeyDeserializerProvider(deserializerProvider).build(); +} + +public ReadAll withValueDeserializerProvider( +DeserializerProvider deserializerProvider) { + return toBuilder().setValueDeserializerProvider(deserializerProvider).build(); +} + +public ReadAll withKeyDeserializer(Class> keyDeserializer) { + return withKeyDeserializerProvider(LocalDeserializerProvider.of(keyDeserializer)); +} + +public ReadAll withValueDeserializer(Class> valueDeserializer) { + return withValueDeserializerProvider(LocalDeserializerProvider.of(valueDeserializer)); +} + +public ReadAll withKeyDeserializerAndCoder( +Class> keyDeserializer, Coder keyCoder) { + return withKeyDeserializer(keyDeserializer).toBuilder().setKeyCoder(keyCoder).build(); +} + +public ReadAll withValueDeserializerAndCoder( +Class> valueDeserializer, Coder valueCoder) { + return withValueDes
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r445226123 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldName; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.kafka.common.TopicPartition; +import org.joda.time.Instant; + +/** + * An AutoValue object which represents a Kafka source description. Note that this object should be + * encoded/decoded with equivalent {@link Schema} as a {@link Row} when crossing the wire. + */ +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class KafkaSourceDescription implements Serializable { + @SchemaFieldName("topic") + abstract String getTopic(); + + @SchemaFieldName("partition") + abstract Integer getPartition(); + + @SchemaFieldName("start_read_offset") + @Nullable + abstract Long getStartReadOffset(); + + @SchemaFieldName("start_read_time") + @Nullable + abstract Instant getStartReadTime(); + + @SchemaFieldName("bootstrapServers") + @Nullable + abstract List getBootStrapServers(); + + private TopicPartition topicPartition = null; + + public TopicPartition getTopicPartition() { Review comment: Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r445225947 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -906,19 +1082,91 @@ public void setValueDeserializer(String valueDeserializer) { Coder keyCoder = getKeyCoder(coderRegistry); Coder valueCoder = getValueCoder(coderRegistry); - // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set. - Unbounded> unbounded = - org.apache.beam.sdk.io.Read.from( - toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource()); + // The Read will be expanded into SDF transform when "beam_fn_api" is enabled and + // "beam_fn_api_use_deprecated_read" is not enabled. + if (!ExperimentalOptions.hasExperiment(input.getPipeline().getOptions(), "beam_fn_api") + || ExperimentalOptions.hasExperiment( + input.getPipeline().getOptions(), "beam_fn_api_use_deprecated_read")) { +// Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set. +Unbounded> unbounded = +org.apache.beam.sdk.io.Read.from( + toBuilder().setKeyCoder(keyCoder).setValueCoder(valueCoder).build().makeSource()); + +PTransform>> transform = unbounded; + +if (getMaxNumRecords() < Long.MAX_VALUE || getMaxReadTime() != null) { + transform = + unbounded.withMaxReadTime(getMaxReadTime()).withMaxNumRecords(getMaxNumRecords()); +} - PTransform>> transform = unbounded; +return input.getPipeline().apply(transform); + } + ReadAll readTransform = + ReadAll.read() + .withConsumerConfigOverrides(getConsumerConfig()) + .withOffsetConsumerConfigOverrides(getOffsetConsumerConfig()) + .withConsumerFactoryFn(getConsumerFactoryFn()) + .withKeyDeserializerProvider(getKeyDeserializerProvider()) + .withValueDeserializerProvider(getValueDeserializerProvider()) + .withManualWatermarkEstimator() + .withTimestampPolicyFactory(getTimestampPolicyFactory()); + if (isCommitOffsetsInFinalizeEnabled()) { +readTransform = readTransform.commitOffsets(); + } + PCollection output = + input + .getPipeline() + .apply(Impulse.create()) + .apply(ParDo.of(new GenerateKafkaSourceDescription(this))); + try { + output.setCoder(KafkaSourceDescription.getCoder(input.getPipeline().getSchemaRegistry())); Review comment: It works with `setSchema` but I want to make it explicitly because it's possible that an user writes a DoFn which produces `KafkaSourceDescription`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tysonjh commented on pull request #12020: [BEAM-9066] Add javax.annotation-api dependency.
tysonjh commented on pull request #12020: URL: https://github.com/apache/beam/pull/12020#issuecomment-649124368 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] tysonjh commented on pull request #12020: [BEAM-9066] Add javax.annotation-api dependency.
tysonjh commented on pull request #12020: URL: https://github.com/apache/beam/pull/12020#issuecomment-649121635 > Ah, I'm wrong. [CDDL is Category B](https://www.apache.org/legal/resolved.html#category-b) Should I update the comments to be more clear? Perhaps having it mention GPL 2.0 and the prohibited link is confusing and I should just reference CDDL? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] youngoli merged pull request #12081: [BEAM-9935] Counting empty split points as "any" in Go DataSource.
youngoli merged pull request #12081: URL: https://github.com/apache/beam/pull/12081 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] allenpradeep edited a comment on pull request #11570: [BEAM-10047] Merge the stages 'Gather and Sort' and 'Create Batches'
allenpradeep edited a comment on pull request #11570: URL: https://github.com/apache/beam/pull/11570#issuecomment-649119157 Can we merge this PR? I would want to send out a PR to count bytes written to spanner and that would be dependent on this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] allenpradeep commented on pull request #11570: [BEAM-10047] Merge the stages 'Gather and Sort' and 'Create Batches'
allenpradeep commented on pull request #11570: URL: https://github.com/apache/beam/pull/11570#issuecomment-649119157 Can we merge this patch? I have a patch to count bytes written to spanner which would be dependent on this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kennknowles commented on pull request #12020: [BEAM-9066] Add javax.annotation-api dependency.
kennknowles commented on pull request #12020: URL: https://github.com/apache/beam/pull/12020#issuecomment-649111027 Ah, I'm wrong. [CDDL is Category B](https://www.apache.org/legal/resolved.html#category-b) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kennknowles commented on pull request #12020: [BEAM-9066] Add javax.annotation-api dependency.
kennknowles commented on pull request #12020: URL: https://github.com/apache/beam/pull/12020#issuecomment-649110871 Neither CDDL nor GPL 2.0 would be allowable. But I believe [this library's license](https://github.com/javaee/javax.annotation/blob/83417807ad402ee1022c0307208d4510c80c68b6/LICENSE#L743) is [GPL 2.0 with the classpath exception](https://openjdk.java.net/legal/gplv2+ce.html), which is noted as an exception under the [Category X licenses](https://www.apache.org/legal/resolved.html#category-x). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on pull request #11406: [BEAM-9748] Refactor Reparallelize as an alternative Reshuffle implementation
lukecwik commented on pull request #11406: URL: https://github.com/apache/beam/pull/11406#issuecomment-649110837 ping? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kennknowles commented on pull request #11886: [BEAM-8647] delete .mailmap
kennknowles commented on pull request #11886: URL: https://github.com/apache/beam/pull/11886#issuecomment-649108755 I see now that the thread link that Ismaël provided is the root discussion that we should follow to resolve this. I want to emphasize that also individual requests to remove information from this file I would of course support. Anyone removing their information should be warned that it does not protect the information or make it less public. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robinyqiu commented on pull request #12054: [BEAM-10219] Support ZetaSQL TIME functions in BeamSQL
robinyqiu commented on pull request #12054: URL: https://github.com/apache/beam/pull/12054#issuecomment-649107647 Oops, `BeamSqlDateFunctionsIntegrationTest.testDateTimeFunctions_currentTime` seems to be failing. Is it failing on your machine as well? That could be a flaky test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11808: [BEAM-10072] Fix RequiresTimeSortedInput for stateless DoFns
lukecwik commented on a change in pull request #11808: URL: https://github.com/apache/beam/pull/11808#discussion_r445207524 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java ## @@ -739,7 +737,9 @@ private static ParDoPayload getParDoPayload(RunnerApi.PTransform parDoPTransform public static boolean usesStateOrTimers(AppliedPTransform transform) throws IOException { ParDoPayload payload = getParDoPayload(transform); -return payload.getStateSpecsCount() > 0 || payload.getTimerFamilySpecsCount() > 0; +return payload.getStateSpecsCount() > 0 +|| payload.getTimerFamilySpecsCount() > 0 +|| payload.getRequiresTimeSortedInput(); Review comment: I think modelling how something is executed and making that separate from what the users definition of their transform is important (so using the PTransformMatcher makes a lot of sense since not all runners will use state). For example, a runner can sort using their shuffle implementation where the timestamp is the sort key (this is something that Dataflow does for some batch pipelines) and other runners may choose to do this as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11808: [BEAM-10072] Fix RequiresTimeSortedInput for stateless DoFns
lukecwik commented on a change in pull request #11808: URL: https://github.com/apache/beam/pull/11808#discussion_r445207524 ## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java ## @@ -739,7 +737,9 @@ private static ParDoPayload getParDoPayload(RunnerApi.PTransform parDoPTransform public static boolean usesStateOrTimers(AppliedPTransform transform) throws IOException { ParDoPayload payload = getParDoPayload(transform); -return payload.getStateSpecsCount() > 0 || payload.getTimerFamilySpecsCount() > 0; +return payload.getStateSpecsCount() > 0 +|| payload.getTimerFamilySpecsCount() > 0 +|| payload.getRequiresTimeSortedInput(); Review comment: I think modelling how something is executed using the PTransformMatcher makes a lot of sense since not all runners will use state. For example, a runner can sort using their shuffle implementation where the timestamp is the sort key (this is something that Dataflow does for some batch pipelines) and other runners may choose to do this as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] robinyqiu commented on a change in pull request #12054: [BEAM-10219] Support ZetaSQL TIME functions in BeamSQL
robinyqiu commented on a change in pull request #12054: URL: https://github.com/apache/beam/pull/12054#discussion_r444548460 ## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/logicaltypes/Time.java ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.logicaltypes; + +import java.time.LocalTime; +import org.apache.beam.sdk.schemas.Schema; + +public class Time implements Schema.LogicalType { Review comment: Please add class-level javadoc for this (see `DATE` for example.) ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java ## @@ -315,7 +317,7 @@ private static Expression castOutput(Expression value, FieldType toType) { private static Expression castOutputTime(Expression value, FieldType toType) { Expression valueDateTime = value; -// First, convert to millis (except for DATE type) +// First, convert to millis (except for DATE/TIME type) Review comment: Seems we can combine the first and second step now (and update the comments). The code will look much simpler that way. ## File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java ## @@ -2457,6 +2458,256 @@ public void testDateFromUnixInt64() { pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); } + / + // TIME type tests + / + + @Test + public void testTimeLiteral() { +String sql = "SELECT TIME '15:30:00'"; + +ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); +BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); +PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + +PAssert.that(stream) +.containsInAnyOrder( +Row.withSchema(Schema.builder().addLogicalTypeField("f_time", SqlTypes.TIME).build()) +.addValues(LocalTime.of(15, 30, 0, 0)) +.build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + @Test + public void testTimeColumn() { +String sql = "SELECT FORMAT_TIME('%T', time_field) FROM table_with_time"; + +ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); +BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); +PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + +PAssert.that(stream) +.containsInAnyOrder( + Row.withSchema(Schema.builder().addStringField("f_time_str").build()) +.addValues("15:30:00") +.build(), + Row.withSchema(Schema.builder().addStringField("f_time_str").build()) +.addValues("23:35:59") +.build()); + pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES)); + } + + // TODO[BEAM-9166]: Add a test for CURRENT_TIME function ("SELECT CURRENT_TIME()") + + @Test + public void testExtractTime() { +String sql = +"SELECT " ++ "EXTRACT(HOUR FROM TIME '15:30:35') as hour, " ++ "EXTRACT(MINUTE FROM TIME '15:30:35') as minute, " ++ "EXTRACT(SECOND FROM TIME '15:30:35') as second, " ++ "EXTRACT(MILLISECOND FROM TIME '15:30:35') as millisecond, " ++ "EXTRACT(MICROSECOND FROM TIME '15:30:35') as microsecond "; + +ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config); +BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql); +PCollection stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode); + +final Schema schema = +Schema.builder() +.addField("hour", FieldType.INT64) +.addField("minute", FieldType.INT64) +.addField("second", FieldType.INT64) +.addField("millisecond", FieldType.INT64) +
[GitHub] [beam] kennknowles edited a comment on pull request #11886: [BEAM-8647] delete .mailmap
kennknowles edited a comment on pull request #11886: URL: https://github.com/apache/beam/pull/11886#issuecomment-649105139 FWIW I am not sure everyone on this PR is familiar with the file. It is a config file to help git pretty print the log. See the "List Of Contributors" on https://beam.apache.org/blog/beam-2.22.0/. Without this file, the list will have duplicate entries like `Kenneth Knowles ` and `Kenn Knowles ` and possibly some that are just `k...@google.com` etc. The file is a hint to git to de-dupe these when printing the log. If anything, using GitHub APIs is worse, because it associates actual registered accounts that are connected to other information. All this file does is associate strings that already exist in the log. But, as noted, that information is also all public. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11808: [BEAM-10072] Fix RequiresTimeSortedInput for stateless DoFns
lukecwik commented on a change in pull request #11808: URL: https://github.com/apache/beam/pull/11808#discussion_r445205831 ## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ## @@ -2572,6 +2572,49 @@ public void testTwoRequiresTimeSortedInputWithLateData() { false); } +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class, + UsesTestStream.class +}) +public void testRequiresTimeSortedInputWithStatelessDoFn() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + TestStream.Builder stream = TestStream.create(VarLongCoder.of()); + for (Long stamp : eventStamps) { +stream = stream.addElements(TimestampedValue.of(stamp, Instant.ofEpochMilli(stamp))); + } + testTimeSortedInputStateless( + numElements, pipeline.apply(stream.advanceWatermarkToInfinity())); +} + +@Test +@Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, Review comment: makes sense This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kennknowles commented on pull request #11886: [BEAM-8647] delete .mailmap
kennknowles commented on pull request #11886: URL: https://github.com/apache/beam/pull/11886#issuecomment-649105139 FWIW I am not sure everyone on this PR is familiar with the file. It is a config file to help git pretty print the log. See the "List Of Contributors" on https://beam.apache.org/blog/beam-2.22.0/. Without this file, the list will have duplicate entries like `Kenneth Knowles ` and `Kenn Knowles ` and possibly some that are just `k...@google.com` etc. The file is a hint to git to de-dupe these when printing the log. If anything, using GitHub APIs is worse, because it associates actual registered accounts that are connected to other information. All this file does is associate strings that already exist in the log. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj merged pull request #11911: [BEAM-10186] Sends an empty response to the runner instead of failing
chamikaramj merged pull request #11911: URL: https://github.com/apache/beam/pull/11911 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] kennknowles commented on pull request #11886: [BEAM-8647] delete .mailmap
kennknowles commented on pull request #11886: URL: https://github.com/apache/beam/pull/11886#issuecomment-649102442 This file is very useful for producing the release notes and integrating with various git stats tools. It is just a reformatting of information available in the git log. I will read the linked thread. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] youngoli commented on pull request #12081: [BEAM-9935] Counting empty split points as "any" in Go DataSource.
youngoli commented on pull request #12081: URL: https://github.com/apache/beam/pull/12081#issuecomment-649098051 I forgot that you could call len on nil slices, that's much better. Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on pull request #12051: [BEAM-10298] beam-linkage-check.sh should not swallow errors
lukecwik commented on pull request #12051: URL: https://github.com/apache/beam/pull/12051#issuecomment-649095296 Run Java PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ZijieSong946 commented on pull request #12054: [BEAM-10219] Support ZetaSQL TIME functions in BeamSQL
ZijieSong946 commented on pull request #12054: URL: https://github.com/apache/beam/pull/12054#issuecomment-649093285 Bug fixed. Thanks a lot. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] pabloem opened a new pull request #12082: Standardizing BigQuery job names in Beam Python and Java SDKs
pabloem opened a new pull request #12082: URL: https://github.com/apache/beam/pull/12082 **Please** add a meaningful description for your change here Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/i
[GitHub] [beam] youngoli commented on pull request #12081: [BEAM-9935] Counting empty split points as "any" in Go DataSource.
youngoli commented on pull request #12081: URL: https://github.com/apache/beam/pull/12081#issuecomment-649086523 R: @lostluck CC: @lukecwik This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] youngoli opened a new pull request #12081: [BEAM-9935] Counting empty split points as "any" in Go DataSource.
youngoli opened a new pull request #12081: URL: https://github.com/apache/beam/pull/12081 Fix a small oversight in the split code. I check for nil to mean any splits are ok, but if the list is empty instead of nil it should have the same behavior. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[
[GitHub] [beam] ibzib merged pull request #12006: [BEAM-10257] Add option defaults for Spark Python tests
ibzib merged pull request #12006: URL: https://github.com/apache/beam/pull/12006 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
TheNeuralBit commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r445181264 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldName; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.kafka.common.TopicPartition; +import org.joda.time.Instant; + +/** + * An AutoValue object which represents a Kafka source description. Note that this object should be + * encoded/decoded with equivalent {@link Schema} as a {@link Row} when crossing the wire. + */ +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class KafkaSourceDescription implements Serializable { + @SchemaFieldName("topic") + abstract String getTopic(); + + @SchemaFieldName("partition") + abstract Integer getPartition(); + + @SchemaFieldName("start_read_offset") + @Nullable + abstract Long getStartReadOffset(); + + @SchemaFieldName("start_read_time") + @Nullable + abstract Instant getStartReadTime(); + + @SchemaFieldName("bootstrapServers") + @Nullable + abstract List getBootStrapServers(); + + private TopicPartition topicPartition = null; + + public TopicPartition getTopicPartition() { Review comment: This will get pulled into the generated schema which I don't think is your intention. You should change the name so it's not a getter, or add `@SchemaIgnore` ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldName; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.kafka.common.TopicPartition; +import org.joda.time.Instant; + +/** + * An AutoValue object which represents a Kafka source description. Note that this object should be + * encoded/decoded with equivalent {@link Schema} as a {@link Row} when crossing the wire. + */ +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class KafkaSourceDescription implements Serializable { + @SchemaFieldName("topic") + abstract S
[GitHub] [beam] annaqin418 commented on pull request #12006: [BEAM-10257] Add option defaults for Spark Python tests
annaqin418 commented on pull request #12006: URL: https://github.com/apache/beam/pull/12006#issuecomment-649070534 LGTM! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on a change in pull request #12079: [BEAM-9890] Support BIT_AND aggregation function in Beam SQL
amaliujia commented on a change in pull request #12079: URL: https://github.com/apache/beam/pull/12079#discussion_r445169828 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java ## @@ -383,4 +392,30 @@ public Long extractOutput(Long accum) { return accum; } } + + static class BitAnd extends CombineFn { +@Override +public Long createAccumulator() { + return -1L; Review comment: O for that pipeline run and then wait. Thanks that's a nice catch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] Imfuyuwei commented on a change in pull request #12079: [BEAM-9890] Support BIT_AND aggregation function in Beam SQL
Imfuyuwei commented on a change in pull request #12079: URL: https://github.com/apache/beam/pull/12079#discussion_r445167391 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java ## @@ -383,4 +392,30 @@ public Long extractOutput(Long accum) { return accum; } } + + static class BitAnd extends CombineFn { +@Override +public Long createAccumulator() { + return -1L; Review comment: Thanks! BTW I want to mention that I also added 1 line at the end of the previous testBitOrFunction(). I noticed that without this line, the previous bit_or test would always pass no matter what expected result I set, which made it an invalid test. It will be good if you can take a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] Imfuyuwei commented on a change in pull request #12079: [BEAM-9890] Support BIT_AND aggregation function in Beam SQL
Imfuyuwei commented on a change in pull request #12079: URL: https://github.com/apache/beam/pull/12079#discussion_r445167391 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java ## @@ -383,4 +392,30 @@ public Long extractOutput(Long accum) { return accum; } } + + static class BitAnd extends CombineFn { +@Override +public Long createAccumulator() { + return -1L; Review comment: Thanks! BTW I want to mention that I also added 1 line at the end of the previous testBitOrFunction(). I noticed that without this line, the previous bit_or test will always no matter what expected result I set, which makes it an invalid test. It will be good if you can take a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on pull request #12006: [BEAM-10257] Add option defaults for Spark Python tests
ibzib commented on pull request #12006: URL: https://github.com/apache/beam/pull/12006#issuecomment-649064265 Run Python Spark ValidatesRunner This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia merged pull request #12079: [BEAM-9890] Support BIT_AND aggregation function in Beam SQL
amaliujia merged pull request #12079: URL: https://github.com/apache/beam/pull/12079 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on a change in pull request #12079: [BEAM-9890] Support BIT_AND aggregation function in Beam SQL
amaliujia commented on a change in pull request #12079: URL: https://github.com/apache/beam/pull/12079#discussion_r445163189 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java ## @@ -383,4 +392,30 @@ public Long extractOutput(Long accum) { return accum; } } + + static class BitAnd extends CombineFn { +@Override +public Long createAccumulator() { + return -1L; Review comment: That's a good point 👍 I should go back to re-visit CMU 15213 course slides. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on a change in pull request #12079: [BEAM-9890] Support BIT_AND aggregation function in Beam SQL
amaliujia commented on a change in pull request #12079: URL: https://github.com/apache/beam/pull/12079#discussion_r445163189 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java ## @@ -383,4 +392,30 @@ public Long extractOutput(Long accum) { return accum; } } + + static class BitAnd extends CombineFn { +@Override +public Long createAccumulator() { + return -1L; Review comment: That's a good point 👍 I should go back to re-visit CMU 15231 course slides. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on a change in pull request #12073: [BEAM-9543] Support Match Recognition in Beam SQL
amaliujia commented on a change in pull request #12073: URL: https://github.com/apache/beam/pull/12073#discussion_r445150359 ## File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMatchRelTest.java ## @@ -0,0 +1,50 @@ +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import org.apache.beam.sdk.extensions.sql.SqlTransform; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.junit.Test; +import org.apache.beam.sdk.schemas.Schema; + +public class BeamMatchRelTest { + + public static final TestPipeline pipeline = TestPipeline.create(); + + @Test + public void MatchLogicalPlanTest() { +Schema schemaType = Schema.builder() +.addInt32Field("id") +.addStringField("name") +.addInt32Field("proctime") +.build(); + +PCollection input = +pipeline.apply( +Create.of( + Row.withSchema(schemaType).addValue(1).addValue("a").addValue(1).build()) +.withRowSchema(schemaType)); + +String sql = "SELECT T.aid, T.bid, T.cid " + +"FROM PCOLLECTION " + +"MATCH_RECOGNIZE (" + +"PARTITION BY id " + +"ORDER BY proctime " + +"MEASURES " + +"A.id AS aid, " + +"B.id AS bid, " + +"C.id AS cid " + +"PATTERN (A B C) " + +"DEFINE " + +"A AS name = 'a', " + +"B AS name = 'b', " + +"C AS name = 'c' " + +") AS T"; + +PCollection result = input.apply(SqlTransform.query(sql)); Review comment: I see. Yes I think this test is valid. It tests if the query can be compiled, which includes the BeamMatchRel. ## File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMatchRelTest.java ## @@ -0,0 +1,50 @@ +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import org.apache.beam.sdk.extensions.sql.SqlTransform; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.junit.Test; +import org.apache.beam.sdk.schemas.Schema; + +public class BeamMatchRelTest { + + public static final TestPipeline pipeline = TestPipeline.create(); + + @Test + public void MatchLogicalPlanTest() { +Schema schemaType = Schema.builder() +.addInt32Field("id") +.addStringField("name") +.addInt32Field("proctime") +.build(); + +PCollection input = +pipeline.apply( +Create.of( + Row.withSchema(schemaType).addValue(1).addValue("a").addValue(1).build()) +.withRowSchema(schemaType)); + +String sql = "SELECT T.aid, T.bid, T.cid " + +"FROM PCOLLECTION " + +"MATCH_RECOGNIZE (" + +"PARTITION BY id " + +"ORDER BY proctime " + +"MEASURES " + +"A.id AS aid, " + +"B.id AS bid, " + +"C.id AS cid " + +"PATTERN (A B C) " + +"DEFINE " + +"A AS name = 'a', " + +"B AS name = 'b', " + +"C AS name = 'c' " + +") AS T"; + +PCollection result = input.apply(SqlTransform.query(sql)); Review comment: I see. Yes I think this test is valid. It tests if the query can be compiled, which covers the BeamMatchRel. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
lukecwik commented on a change in pull request #11749: URL: https://github.com/apache/beam/pull/11749#discussion_r445087927 ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java ## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.kafka; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldName; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.kafka.common.TopicPartition; +import org.joda.time.Instant; + +/** + * An AutoValue object which represents a Kafka source description. Note that this object should be + * encoded/decoded with equivalent {@link Schema} as a {@link Row} when crossing the wire. Review comment: ```suggestion * Represents a Kafka source description. * * Note that this object should be encoded/decoded with its corresponding {@link #getCoder schema coder}. ``` ## File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ## @@ -198,6 +213,154 @@ *... * } * + * Read from Kafka as a {@link DoFn} + * + * {@link ReadAll} is the {@link PTransform} that takes a PCollection of {@link + * KafkaSourceDescription} as input and outputs a PCollection of {@link KafkaRecord}. The core + * implementation is based on {@code SplittableDoFn}. For more details about the concept of {@code + * SplittableDoFn}, please refer to the https://beam.apache.org/blog/splittable-do-fn/";>blog post and https://s.apache.org/beam-fn-api";>design doc. The major difference from {@link + * KafkaIO.Read} is, {@link ReadAll} doesn't require source descriptions(e.g., {@link + * KafkaIO.Read#getTopicPartitions()}, {@link KafkaIO.Read#getTopics()}, {@link + * KafkaIO.Read#getStartReadTime()}, etc.) during the pipeline construction time. Instead, the + * pipeline can populate these source descriptions during runtime. For example, the pipeline can + * query Kafka topics from BigQuery table and read these topics via {@link ReadAll}. + * + * Common Kafka Consumer Configurations + * + * Most Kafka consumer configurations are similar to {@link KafkaIO.Read}: + * + * + * {@link ReadAll#getConsumerConfig()} is the same as {@link + * KafkaIO.Read#getConsumerConfig()}. + * {@link ReadAll#getConsumerFactoryFn()} is the same as {@link + * KafkaIO.Read#getConsumerFactoryFn()}. + * {@link ReadAll#getOffsetConsumerConfig()} is the same as {@link + * KafkaIO.Read#getOffsetConsumerConfig()}. + * {@link ReadAll#getKeyCoder()} is the same as {@link KafkaIO.Read#getKeyCoder()}. + * {@link ReadAll#getValueCoder()} is the same as {@link KafkaIO.Read#getValueCoder()}. + * {@link ReadAll#getKeyDeserializerProvider()} is the same as {@link + * KafkaIO.Read#getKeyDeserializerProvider()}. + * {@link ReadAll#getValueDeserializerProvider()} is the same as {@link + * KafkaIO.Read#getValueDeserializerProvider()}. + * {@link ReadAll#isCommitOffsetEnabled()} means the same as {@link + * KafkaIO.Read#isCommitOffsetsInFinalizeEnabled()}. + * + * + * For example, to create a basic {@link ReadAll} transform: + * + * {@code + * pipeline + * .apply(Create.of(KafkaSourceDescription.of(new TopicPartition("topic", 1))) + * .apply(KafkaIO.readAll() + * .withBootstrapServers("broker_1:9092,broker_2:9092") + * .withKeyDeserializer(LongDeserializer.class). + * .withValueDeserializer(StringDeserializer.class)); + * + * Note that the {@code bootstrapServers} can also be populated from {@link KafkaSourceDescription}: + * pipeline Review comment: ```suggestion * {@code
[GitHub] [beam] Imfuyuwei commented on a change in pull request #12079: [BEAM-9890] Support BIT_AND aggregation function in Beam SQL
Imfuyuwei commented on a change in pull request #12079: URL: https://github.com/apache/beam/pull/12079#discussion_r445160365 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java ## @@ -383,4 +392,30 @@ public Long extractOutput(Long accum) { return accum; } } + + static class BitAnd extends CombineFn { +@Override +public Long createAccumulator() { + return -1L; Review comment: Because -1L is represented as 64 bits of 1 in binary while 1L only has one 1 at the least significant bit. In order to do bit_and operation, I think the initial bit mask should consists of only 1s, so I use -1L. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] boyuanzz commented on pull request #11749: [BEAM-9977] Implement ReadFromKafkaViaSDF
boyuanzz commented on pull request #11749: URL: https://github.com/apache/beam/pull/11749#issuecomment-649059330 Run Python2_PVR_Flink PreCommit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] chamikaramj commented on a change in pull request #11911: [BEAM-10186] Sends an empty response to the runner instead of failing
chamikaramj commented on a change in pull request #11911: URL: https://github.com/apache/beam/pull/11911#discussion_r445156490 ## File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java ## @@ -260,6 +260,46 @@ BundleProcessor get( } } + @Test + public void testTrySplitBeforeBundleDoesNotFail() { +ProcessBundleHandler handler = +new ProcessBundleHandler( +PipelineOptionsFactory.create(), +null, +beamFnDataClient, +null /* beamFnStateClient */, +null /* finalizeBundleHandler */, +ImmutableMap.of(), +new BundleProcessorCache()); + +handler.trySplit( Review comment: Done. ## File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java ## @@ -260,6 +260,46 @@ BundleProcessor get( } } + @Test + public void testTrySplitBeforeBundleDoesNotFail() { +ProcessBundleHandler handler = +new ProcessBundleHandler( +PipelineOptionsFactory.create(), +null, +beamFnDataClient, +null /* beamFnStateClient */, +null /* finalizeBundleHandler */, +ImmutableMap.of(), +new BundleProcessorCache()); + +handler.trySplit( +BeamFnApi.InstructionRequest.newBuilder() +.setInstructionId("999L") +.setProcessBundleSplit( + BeamFnApi.ProcessBundleSplitRequest.newBuilder().setInstructionId("unknown-id")) +.build()); + } + + @Test + public void testProgressBeforeBundleDoesNotFail() throws Exception { +ProcessBundleHandler handler = +new ProcessBundleHandler( +PipelineOptionsFactory.create(), +null, +beamFnDataClient, +null /* beamFnStateClient */, +null /* finalizeBundleHandler */, +ImmutableMap.of(), +new BundleProcessorCache()); + +handler.progress( Review comment: Done. ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java ## @@ -341,53 +341,52 @@ private void createRunnerAndConsumersForPTransformRecursively( throws Exception { BundleProcessor bundleProcessor = bundleProcessorCache.find(request.getProcessBundleProgress().getInstructionId()); -if (bundleProcessor == null) { Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on a change in pull request #12079: [BEAM-9890] Support BIT_AND aggregation function in Beam SQL
amaliujia commented on a change in pull request #12079: URL: https://github.com/apache/beam/pull/12079#discussion_r445151779 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java ## @@ -383,4 +392,30 @@ public Long extractOutput(Long accum) { return accum; } } + + static class BitAnd extends CombineFn { +@Override +public Long createAccumulator() { + return -1L; Review comment: -1L makes it a bit harder to read. Why not use 1L instead? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on a change in pull request #12073: [BEAM-9543] Support Match Recognition in Beam SQL
amaliujia commented on a change in pull request #12073: URL: https://github.com/apache/beam/pull/12073#discussion_r445150359 ## File path: sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMatchRelTest.java ## @@ -0,0 +1,50 @@ +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import org.apache.beam.sdk.extensions.sql.SqlTransform; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.junit.Test; +import org.apache.beam.sdk.schemas.Schema; + +public class BeamMatchRelTest { + + public static final TestPipeline pipeline = TestPipeline.create(); + + @Test + public void MatchLogicalPlanTest() { +Schema schemaType = Schema.builder() +.addInt32Field("id") +.addStringField("name") +.addInt32Field("proctime") +.build(); + +PCollection input = +pipeline.apply( +Create.of( + Row.withSchema(schemaType).addValue(1).addValue("a").addValue(1).build()) +.withRowSchema(schemaType)); + +String sql = "SELECT T.aid, T.bid, T.cid " + +"FROM PCOLLECTION " + +"MATCH_RECOGNIZE (" + +"PARTITION BY id " + +"ORDER BY proctime " + +"MEASURES " + +"A.id AS aid, " + +"B.id AS bid, " + +"C.id AS cid " + +"PATTERN (A B C) " + +"DEFINE " + +"A AS name = 'a', " + +"B AS name = 'b', " + +"C AS name = 'c' " + +") AS T"; + +PCollection result = input.apply(SqlTransform.query(sql)); Review comment: I see. Yes I think this test is valid. It tests if the query can be compile, which includes the BeamMatchRel. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on a change in pull request #12073: [BEAM-9543] Support Match Recognition in Beam SQL
amaliujia commented on a change in pull request #12073: URL: https://github.com/apache/beam/pull/12073#discussion_r445149049 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMatchRel.java ## @@ -0,0 +1,279 @@ +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import org.apache.beam.sdk.extensions.sql.impl.SqlConversionException; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelCollation; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Match; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexVariable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Array; +import java.util.*; + +import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkArgument; + +/** {@link BeamRelNode} to replace a {@link Match} node. */ +public class BeamMatchRel extends Match implements BeamRelNode { + +private static final Logger LOG = LoggerFactory.getLogger(BeamMatchRel.class); + +public BeamMatchRel( +RelOptCluster cluster, +RelTraitSet traitSet, +RelNode input, +RelDataType rowType, +RexNode pattern, +boolean strictStart, +boolean strictEnd, +Map patternDefinitions, +Map measures, +RexNode after, +Map> subsets, +boolean allRows, +List partitionKeys, +RelCollation orderKeys, +RexNode interval) { + +super(cluster, +traitSet, +input, +rowType, +pattern, +strictStart, +strictEnd, +patternDefinitions, +measures, +after, +subsets, +allRows, +partitionKeys, +orderKeys, +interval); + +} + +@Override +public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { +return BeamCostModel.FACTORY.makeTinyCost(); // return constant costModel for now +} + +@Override +public NodeStats estimateNodeStats(RelMetadataQuery mq) { +// a simple way of getting some estimate data +// to be examined further +NodeStats inputEstimate = BeamSqlRelUtils.getNodeStats(input, mq); +double numRows = inputEstimate.getRowCount(); +double winSize = inputEstimate.getWindow(); +double rate = inputEstimate.getRate(); + +return NodeStats.create(numRows, rate, winSize).multiply(0.5); +} + +@Override +public PTransform, PCollection> buildPTransform() { + +return new matchTransform(partitionKeys, orderKeys); +} + +private static class matchTransform extends PTransform, PCollection> { + +private final List parKeys; +private final RelCollation orderKeys; + +public matchTransform(List parKeys, RelCollation orderKeys) { +this.parKeys = parKeys; +this.orderKeys = orderKeys; +} + +@Override +public PCollection expand(PCollectionList pinput) { +checkArgument( +pinput.size() == 1, +"Wrong number of inputs for %s: %s", +BeamMatchRel.class.getSimpleName(), +pinput); +PCollection upstream = pinput.get(0); + +Schema collectionSchema = upstream.getSchema(); + +Schema.Builder schemaBuilder = new Schema.Builder(); +for (RexNode i : parKeys) { +RexVariable varNode = (RexVariable) i; +int index = Integer.parseInt(varNode.getName().substring(1)); // get rid of `$` +sche
[GitHub] [beam] mxm commented on a change in pull request #12062: [BEAM-10305] Let InMemoryBagUserStateFactory only use a single cache token
mxm commented on a change in pull request #12062: URL: https://github.com/apache/beam/pull/12062#discussion_r445147652 ## File path: runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/state/StateRequestHandlersTest.java ## @@ -59,4 +68,46 @@ public void testDelegatingStateHandlerThrowsWhenNotFound() throws Exception { StateRequestHandlers.delegateBasedUponType(new EnumMap<>(StateKey.TypeCase.class)) .handle(StateRequest.getDefaultInstance()); } + + @Test + public void testUserStateCacheTokenGeneration() { +ProcessBundleDescriptors.ExecutableProcessBundleDescriptor processBundleDescriptor = + Mockito.mock(ProcessBundleDescriptors.ExecutableProcessBundleDescriptor.class); +InMemoryBagUserStateFactory inMemoryBagUserStateFactory = new InMemoryBagUserStateFactory<>(); +StateRequestHandler stateRequestHandler = +StateRequestHandlers.forBagUserStateHandlerFactory( +processBundleDescriptor, inMemoryBagUserStateFactory); + +Iterable cacheTokens = +stateRequestHandler.getCacheTokens(); +assertThat(Iterables.size(cacheTokens), is(1)); + +BeamFnApi.ProcessBundleRequest.CacheToken cacheToken = Iterables.getOnlyElement(cacheTokens); +assertThat( +cacheToken.getUserState(), + is(BeamFnApi.ProcessBundleRequest.CacheToken.UserState.getDefaultInstance())); +assertThat(cacheToken.getToken(), is(notNullValue())); + +inMemoryBagUserStateFactory.forUserState( Review comment: Looks like this should be using the StateRequestHandler instead, will update. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] amaliujia commented on a change in pull request #12073: [BEAM-9543] Support Match Recognition in Beam SQL
amaliujia commented on a change in pull request #12073: URL: https://github.com/apache/beam/pull/12073#discussion_r445147153 ## File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamMatchRel.java ## @@ -0,0 +1,279 @@ +package org.apache.beam.sdk.extensions.sql.impl.rel; + +import org.apache.beam.sdk.extensions.sql.impl.SqlConversionException; +import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel; +import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelCollation; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelFieldCollation; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Match; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode; +import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexVariable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Array; +import java.util.*; Review comment: Same. Import class by class. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mxm commented on pull request #12062: [BEAM-10305] Let InMemoryBagUserStateFactory only use a single cache token
mxm commented on pull request #12062: URL: https://github.com/apache/beam/pull/12062#issuecomment-649046917 Thanks for pointing out the duplication issue. The new version should properly fix it. Please have another look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] TheNeuralBit commented on pull request #12067: [BEAM-10308] Make component ID assignments consistent across PipelineContext instances
TheNeuralBit commented on pull request #12067: URL: https://github.com/apache/beam/pull/12067#issuecomment-649046980 I suppose another place where global state is ill-advised is when running tests, since we run all the tests in the same process and many of them create pipelines. Python precommits are failing because of this. It looks like the failing tests are making assertions about specific values of component keys which seems brittle, for example: ``` self = def test_cacheable_key_with_version_map(self): p = beam.Pipeline(interactive_runner.InteractiveRunner()) # pylint: disable=range-builtin-not-iterating init_pcoll = p | 'Init Create' >> beam.Create(range(10)) # It's normal that when executing, the pipeline object is a different # but equivalent instance from what user has built. The pipeline instrument # should be able to identify if the original instance has changed in an # interactive env while mutating the other instance for execution. The # version map can be used to figure out what the PCollection instances are # in the original instance and if the evaluation has changed since last # execution. p2 = beam.Pipeline(interactive_runner.InteractiveRunner()) # pylint: disable=range-builtin-not-iterating init_pcoll_2 = p2 | 'Init Create' >> beam.Create(range(10)) _, ctx = p2.to_runner_api(use_fake_coders=True, return_context=True) # The cacheable_key should use id(init_pcoll) as prefix even when # init_pcoll_2 is supplied as long as the version map is given. self.assertEqual( instr.cacheable_key( init_pcoll_2, instr.pcolls_to_pcoll_id(p2, ctx), {'ref_PCollection_PCollection_8': str(id(init_pcoll))}), > str(id(init_pcoll)) + '_ref_PCollection_PCollection_8') E AssertionError: '140176476148624_ref_PCollection_PCollection_4539' != '140176476499024_ref_PCollection_PCollection_8' E - 140176476148624_ref_PCollection_PCollection_4539 E ? - ^^ E + 140176476499024_ref_PCollection_PCollection_8 E ? ^^^ ``` Its probably easier to do the work to make sure cached component ids are scoped to an individual pipeline rather than fixing all of these tests (and a global cache shared across tests will be problematic anyway). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib opened a new pull request #12080: Return throwing state handler for default API descriptor.
ibzib opened a new pull request #12080: URL: https://github.com/apache/beam/pull/12080 Follow-up from https://github.com/apache/beam/pull/12040/files#r445063845 Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBu
[GitHub] [beam] ibzib merged pull request #12040: Revert "Fix state handler for missing service descriptor."
ibzib merged pull request #12040: URL: https://github.com/apache/beam/pull/12040 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] mxm commented on a change in pull request #12062: [BEAM-10305] Let InMemoryBagUserStateFactory only use a single cache token
mxm commented on a change in pull request #12062: URL: https://github.com/apache/beam/pull/12062#discussion_r445131904 ## File path: runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/InMemoryBagUserStateFactory.java ## @@ -37,13 +37,19 @@ /** * Holds user state in memory. Only one key is active at a time due to the GroupReduceFunction being * called once per key. Needs to be reset via {@code resetForNewKey()} before processing a new key. + * + * In case of any failures, this factory must be discarded. Otherwise, the contained state cache + * token would be reused which would corrupt the state cache. */ public class InMemoryBagUserStateFactory implements StateRequestHandlers.BagUserStateHandlerFactory { + private final ByteString cacheToken; + private List handlers; public InMemoryBagUserStateFactory() { +cacheToken = ByteString.copyFrom(UUID.randomUUID().toString().getBytes(Charsets.UTF_8)); Review comment: I see, makes sense. Let me update the PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] JozoVilcek commented on pull request #12064: [BEAM-10284] Add option to pass configuration into ParquetIO.Sink
JozoVilcek commented on pull request #12064: URL: https://github.com/apache/beam/pull/12064#issuecomment-649031901 Hi, yes, the PR is to control paqruet settings you mentioned. I was considering a Map based settings but since parquet writer is hadoop based anyway now, I thought there is not reason why not to expose full config. I do not see any reason why not to make it Map if this is something you prefer. Now I think it is probably a better choice This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib commented on pull request #11738: [BEAM-9936] Create SDK harness containers with Python 3.8
ibzib commented on pull request #11738: URL: https://github.com/apache/beam/pull/11738#issuecomment-649032247 I think this PR might have caused BEAM-10316, PTAL This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] Imfuyuwei opened a new pull request #12079: [BEAM-9890] Support BIT_AND aggregation function in Beam SQL and adde…
Imfuyuwei opened a new pull request #12079: URL: https://github.com/apache/beam/pull/12079 R: @amaliujia CC: @kennknowles Added support for BIT_AND aggregation function in Beam SQL. Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_Post
[GitHub] [beam] epicfaace commented on pull request #11824: [BEAM-10101] Add HttpIO / HttpFileSystem (Python)
epicfaace commented on pull request #11824: URL: https://github.com/apache/beam/pull/11824#issuecomment-649021127 @pabloem anything you need from me on this one? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] ibzib opened a new pull request #12078: [BEAM-10315] Fix gradlew clean.
ibzib opened a new pull request #12078: URL: https://github.com/apache/beam/pull/12078 R: @kamilwu Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) Lang | SDK | Dataflow | Flink | Samza | Spark --- | --- | --- | --- | --- | --- Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostComm