RE: NanosInstant not being recognised by BigQueryIO.Write
Hi Cham, I’m not registered as a BEAM committer so I don’t believe I have access rights to raise JIRAs. Would you be kind enough to file a bug report on my behalf? I would suggest creating a simple end-to-end test that takes a protobuf-generated domain object and writes to BigQuery making use of useBeamSchema (I couldn’t find such an example in the test suite). Briefly the issues I have identified are: 1. As reported below, schema conversion does not recognise the protobuf-derived special types (e.g. NanosInstant). This results in an NPE 2. The schema conversion code does not recognise fields in camel case (it works if fields are lower case) 3. The schema presented to BigQueryIO presents all fields as required (despite the fact that under protobuf3 all fields are optional) Point 3 is more likely an issue with the protobuf to BEAM schema generation, however I present it here as it would be useful to capture as part of an end-to-end test suite. Kind regards, Rob From: Chamikara Jayalath [mailto:chamik...@google.com] Sent: 15 July 2020 16:36 To: dev Subject: Re: NanosInstant not being recognised by BigQueryIO.Write * "This is an external email. Do you know who has sent it? Can you be sure that any links and attachments contained within it are safe? If in any doubt, use the Phishing Reporter Button in your Outlook client or forward the email as an attachment to ~ I've Been Phished" * On Wed, Jul 8, 2020 at 3:30 AM mailto:robert.butc...@natwestmarkets.com>> wrote: Hi All, I am posting this to the dev (as opposed to user channel) as I believe it will be of interest to the those working on either Schemas or BigQuery I have a pipeline based on BEAM 2.22 that is ingesting data into BigQuery. Internally I am using protobuf for my domain model and the associated schema support. My intention is to make use of the useBeamSchema() method to both auto-generate the BigQuery table schema and to provide row conversion on write. (The idea is to have true schema-first development very much in keeping with Alex’s original ProtoBEAM concept). The issue I’ve hit is around treatment of google.protobuf.Timestamp fields. The schema conversion seems to map these to the correct logical type: org.apache.beam.sdk.schemas.logicaltypes.NanosInstant, however this isn’t recognised by BigQueryIO.Write. Specifically the BigQueryUtils.toTableSchema() method throws a NullPointerException. This seems to be due to the fact that there is no entry for NanosInstant in the BEAM_TO_BIG_QUERY_LOGICAL_MAPPING map. This does sound like a bug since Beam Schema to BigQuery type conversion [1] indeed does not consider org.apache.beam.sdk.schemas.logicaltypes.NanosInstant. Will you be able to file a JIRA with preferably a test to reproduce this ? https://github.com/apache/beam/blob/6c313eb84af6229f0a8a7a0b5890f18c5a8685e8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java#L202 Thanks, Cham Is this a known issue? Is there a workaround? I appreciate that google.protobuf.Timestamp supports nanosecond-level precision so cannot be converted directly to the BEAM schema type of DATETIME without loss of precision. However, I believe use cases for nanosecond precision are rare. Would it not be better to convert directly to DATETIME according to the principle of least confusion? Are there any plans to extend the range of types both within protobuf and the BEAM schema to match the richer type set within BigQuery (DATE, DATETIME, TIMESTAMP)? I would expect the combination of protobuf/BEAM/BigQuery to be a common one (especially within GCP) and it would be nice as a developer to have a greater range of options. Kind regards, Rob Robert Butcher Technical Architect | Foundry/SRS | NatWest Markets WeWork, 10 Devonshire Square, London, EC2M 4AE Mobile +44 (0) 7414 730866 This email is classified as CONFIDENTIAL unless otherwise stated. This communication and any attachments are confidential and intended solely for the addressee. If you are not the intended recipient please advise us immediately and delete it. Unless specifically stated in the message or otherwise indicated, you may not duplicate, redistribute or forward this message and any attachments are not intended for distribution to, or use by any person or entity in any jurisdiction or country where such distribution or use would be contrary to local law or regulation. NatWest Markets Plc or any affiliated entity ("NatWest Markets") accepts no responsibility for any changes made to this message after it was sent. Unless otherwise specifically indicated, the contents of this communication and its attachments are for information purposes only and should not be regarded as an offer or solicitation to buy or sell a product or service, confirmation of any transaction, a valuation,
RE: NanosInstant not being recognised by BigQueryIO.Write
Hi Cham, Yes, I’m happy to when I get a moment. Kind regards, Rob From: Chamikara Jayalath [mailto:chamik...@google.com] Sent: 15 July 2020 16:36 To: dev Subject: Re: NanosInstant not being recognised by BigQueryIO.Write * "This is an external email. Do you know who has sent it? Can you be sure that any links and attachments contained within it are safe? If in any doubt, use the Phishing Reporter Button in your Outlook client or forward the email as an attachment to ~ I've Been Phished" * On Wed, Jul 8, 2020 at 3:30 AM mailto:robert.butc...@natwestmarkets.com>> wrote: Hi All, I am posting this to the dev (as opposed to user channel) as I believe it will be of interest to the those working on either Schemas or BigQuery I have a pipeline based on BEAM 2.22 that is ingesting data into BigQuery. Internally I am using protobuf for my domain model and the associated schema support. My intention is to make use of the useBeamSchema() method to both auto-generate the BigQuery table schema and to provide row conversion on write. (The idea is to have true schema-first development very much in keeping with Alex’s original ProtoBEAM concept). The issue I’ve hit is around treatment of google.protobuf.Timestamp fields. The schema conversion seems to map these to the correct logical type: org.apache.beam.sdk.schemas.logicaltypes.NanosInstant, however this isn’t recognised by BigQueryIO.Write. Specifically the BigQueryUtils.toTableSchema() method throws a NullPointerException. This seems to be due to the fact that there is no entry for NanosInstant in the BEAM_TO_BIG_QUERY_LOGICAL_MAPPING map. This does sound like a bug since Beam Schema to BigQuery type conversion [1] indeed does not consider org.apache.beam.sdk.schemas.logicaltypes.NanosInstant. Will you be able to file a JIRA with preferably a test to reproduce this ? https://github.com/apache/beam/blob/6c313eb84af6229f0a8a7a0b5890f18c5a8685e8/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java#L202 Thanks, Cham Is this a known issue? Is there a workaround? I appreciate that google.protobuf.Timestamp supports nanosecond-level precision so cannot be converted directly to the BEAM schema type of DATETIME without loss of precision. However, I believe use cases for nanosecond precision are rare. Would it not be better to convert directly to DATETIME according to the principle of least confusion? Are there any plans to extend the range of types both within protobuf and the BEAM schema to match the richer type set within BigQuery (DATE, DATETIME, TIMESTAMP)? I would expect the combination of protobuf/BEAM/BigQuery to be a common one (especially within GCP) and it would be nice as a developer to have a greater range of options. Kind regards, Rob Robert Butcher Technical Architect | Foundry/SRS | NatWest Markets WeWork, 10 Devonshire Square, London, EC2M 4AE Mobile +44 (0) 7414 730866 This email is classified as CONFIDENTIAL unless otherwise stated. This communication and any attachments are confidential and intended solely for the addressee. If you are not the intended recipient please advise us immediately and delete it. Unless specifically stated in the message or otherwise indicated, you may not duplicate, redistribute or forward this message and any attachments are not intended for distribution to, or use by any person or entity in any jurisdiction or country where such distribution or use would be contrary to local law or regulation. NatWest Markets Plc or any affiliated entity ("NatWest Markets") accepts no responsibility for any changes made to this message after it was sent. Unless otherwise specifically indicated, the contents of this communication and its attachments are for information purposes only and should not be regarded as an offer or solicitation to buy or sell a product or service, confirmation of any transaction, a valuation, indicative price or an official statement. Trading desks may have a position or interest that is inconsistent with any views expressed in this message. In evaluating the information contained in this message, you should know that it could have been previously provided to other clients and/or internal NatWest Markets personnel, who could have already acted on it. NatWest Markets cannot provide absolute assurances that all electronic communications (sent or received) are secure, error free, not corrupted, incomplete or virus free and/or that they will not be lost, mis-delivered, destroyed, delayed or intercepted/decrypted by others. Therefore NatWest Markets disclaims all liability with regards to electronic communications (and the contents therein) if they are corrupted, lost destroyed, delayed, incomplete, mis-delivered, intercepted, decrypted or otherwise misappropriated by others. Any electronic communication that
NanosInstant not being recognised by BigQueryIO.Write
Hi All, I am posting this to the dev (as opposed to user channel) as I believe it will be of interest to the those working on either Schemas or BigQuery I have a pipeline based on BEAM 2.22 that is ingesting data into BigQuery. Internally I am using protobuf for my domain model and the associated schema support. My intention is to make use of the useBeamSchema() method to both auto-generate the BigQuery table schema and to provide row conversion on write. (The idea is to have true schema-first development very much in keeping with Alex's original ProtoBEAM concept). The issue I've hit is around treatment of google.protobuf.Timestamp fields. The schema conversion seems to map these to the correct logical type: org.apache.beam.sdk.schemas.logicaltypes.NanosInstant, however this isn't recognised by BigQueryIO.Write. Specifically the BigQueryUtils.toTableSchema() method throws a NullPointerException. This seems to be due to the fact that there is no entry for NanosInstant in the BEAM_TO_BIG_QUERY_LOGICAL_MAPPING map. Is this a known issue? Is there a workaround? I appreciate that google.protobuf.Timestamp supports nanosecond-level precision so cannot be converted directly to the BEAM schema type of DATETIME without loss of precision. However, I believe use cases for nanosecond precision are rare. Would it not be better to convert directly to DATETIME according to the principle of least confusion? Are there any plans to extend the range of types both within protobuf and the BEAM schema to match the richer type set within BigQuery (DATE, DATETIME, TIMESTAMP)? I would expect the combination of protobuf/BEAM/BigQuery to be a common one (especially within GCP) and it would be nice as a developer to have a greater range of options. Kind regards, Rob Robert Butcher Technical Architect | Foundry/SRS | NatWest Markets WeWork, 10 Devonshire Square, London, EC2M 4AE Mobile +44 (0) 7414 730866 This email is classified as CONFIDENTIAL unless otherwise stated. This communication and any attachments are confidential and intended solely for the addressee. If you are not the intended recipient please advise us immediately and delete it. Unless specifically stated in the message or otherwise indicated, you may not duplicate, redistribute or forward this message and any attachments are not intended for distribution to, or use by any person or entity in any jurisdiction or country where such distribution or use would be contrary to local law or regulation. NatWest Markets Plc or any affiliated entity ("NatWest Markets") accepts no responsibility for any changes made to this message after it was sent. Unless otherwise specifically indicated, the contents of this communication and its attachments are for information purposes only and should not be regarded as an offer or solicitation to buy or sell a product or service, confirmation of any transaction, a valuation, indicative price or an official statement. Trading desks may have a position or interest that is inconsistent with any views expressed in this message. In evaluating the information contained in this message, you should know that it could have been previously provided to other clients and/or internal NatWest Markets personnel, who could have already acted on it. NatWest Markets cannot provide absolute assurances that all electronic communications (sent or received) are secure, error free, not corrupted, incomplete or virus free and/or that they will not be lost, mis-delivered, destroyed, delayed or intercepted/decrypted by others. Therefore NatWest Markets disclaims all liability with regards to electronic communications (and the contents therein) if they are corrupted, lost destroyed, delayed, incomplete, mis-delivered, intercepted, decrypted or otherwise misappropriated by others. Any electronic communication that is conducted within or through NatWest Markets systems will be subject to being archived, monitored and produced to regulators and in litigation in accordance with NatWest Markets’ policy and local laws, rules and regulations. Unless expressly prohibited by local law, electronic communications may be archived in countries other than the country in which you are located, and may be treated in accordance with the laws and regulations of the country of each individual included in the entire chain. Copyright NatWest Markets Plc. All rights reserved. See https://www.nwm.com/disclaimer for further risk disclosure.
RE: Unit-testing BEAM pipelines with PROCESSING_TIME timers
Many thanks, Darshan! I’ve made equivalent changes to my test case and it’s working fine. Kind regards, Rob From: Darshan Jani [mailto:darshanjani...@gmail.com] Sent: 11 May 2020 15:08 To: dev@beam.apache.org Subject: Re: Unit-testing BEAM pipelines with PROCESSING_TIME timers * "This is an external email. Do you know who has sent it? Can you be sure that any links and attachments contained within it are safe? If in any doubt, use the Phishing Reporter Button in your Outlook client or forward the email as an attachment to ~ I've Been Phished" * Hi Robert, I found this sample test with Timer on processing time. From the error, I assume there may be is a problem what are you asserting in your PAssert. https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java#L3633-L3665 I ran it locally and it runs fine. -Regards Darshan On Mon, May 11, 2020 at 5:28 PM mailto:robert.butc...@natwestmarkets.com>> wrote: I have a BEAM DoFn that I’m attempting to unit test. It involves using a timer based on processing time and I’ve not managed to get it to fire. The relevant code excerpts are as follows: @TimerId("timer") private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); @ProcessElement public void process(@TimerId("timer") Timer timer) { // Set a processing time timer to fire in 5 seconds so we can poll BigQuery timer.offset(Duration.standardSeconds(5)).setRelative(); } @OnTimer("timer") public void onTimer() { System.out.println("In onTimer"); When I use a TestPipeline with an appropriate PAssert, it always results in the following exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.util.NoSuchElementException at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348) at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317) at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350) at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331) at com.nwm.foundry.atomic.AtomicCommitFnTest.shouldGenerateCorrectEvent(AtomicCommitFnTest.java:28) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) Caused by: java.util.NoSuchElementException at java.util.ArrayList$Itr.next(ArrayList.java:862) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators.getOnlyElement(Iterators.java:302) at
Unit-testing BEAM pipelines with PROCESSING_TIME timers
I have a BEAM DoFn that I'm attempting to unit test. It involves using a timer based on processing time and I've not managed to get it to fire. The relevant code excerpts are as follows: @TimerId("timer") private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); @ProcessElement public void process(@TimerId("timer") Timer timer) { // Set a processing time timer to fire in 5 seconds so we can poll BigQuery timer.offset(Duration.standardSeconds(5)).setRelative(); } @OnTimer("timer") public void onTimer() { System.out.println("In onTimer"); When I use a TestPipeline with an appropriate PAssert, it always results in the following exception: org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.util.NoSuchElementException at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348) at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317) at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350) at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331) at com.nwm.foundry.atomic.AtomicCommitFnTest.shouldGenerateCorrectEvent(AtomicCommitFnTest.java:28) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) Caused by: java.util.NoSuchElementException at java.util.ArrayList$Itr.next(ArrayList.java:862) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators.getOnlyElement(Iterators.java:302) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables.getOnlyElement(Iterables.java:254) at org.apache.beam.sdk.testing.PAssert$SingletonCheckerDoFn.processElement(PAssert.java:1417) Swapping the timer for an EVENT_TIME timer works fine. Is there a trick I'm missing here? Kind regards, Rob Robert Butcher Technical Architect | Foundry/SRS | NatWest Markets WeWork, 10 Devonshire Square, London, EC2M 4AE Mobile +44 (0) 7414 730866 This email is classified as CONFIDENTIAL unless otherwise stated. This communication and any attachments are confidential and intended solely for the addressee. If you are not the intended recipient please advise us immediately and delete it. Unless specifically stated in the message or otherwise indicated, you may not duplicate, redistribute or forward this message and any attachments are not intended for distribution to, or use by any person or entity in any jurisdiction or country where such distribution or use would be contrary to local law or regulation. NatWest Markets Plc or any affiliated entity ("NatWest Markets") accepts no