Cool! Thanks a lot for your explanation and your time, Jeff, very much appreciated.
On Thu, 9 Jul 2020 at 17:27, Jeff Klukas <jklu...@mozilla.com> wrote: > On Thu, Jul 9, 2020 at 10:18 AM Kirill Zhdanovich <kzhdanov...@gmail.com> > wrote: > >> So I guess I need to switch to Map<String, Object> instead of TableRow? >> > > Yes, I would definitely recommend that you switch to Map<String, Object>. > That's the most basic interface, and every deserialization of a top-level > TableRow object must provide objects matching that interface wherever the > BQ schema has a nested STRUCT/RECORD. > > Note that the latest docs for BigQueryIO do include a table that maps BQ > types to Java types, but unfortunately that table lists STRUCTs as mapping > to avro GenericRecord, which doesn't give you the info you need to > understand the Map<String, Object> interface inside TableRows: > > > https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html > > You may want to file a JIRA ticket requesting more explicit documentation > about how to parse structs out of TableRow objects. > > >> On Thu, 9 Jul 2020 at 17:13, Jeff Klukas <jklu...@mozilla.com> wrote: >> >>> It looks like the fact that your pipeline in production produces nested >>> TableRows is an artifact of the following decision within BigQueryIO logic: >>> >>> >>> https://github.com/apache/beam/blob/1a260cd70117b77e33442f0be8f213363a2db5fc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java#L348-L350 >>> >>> The convertGenericRecordToTableRow function is used recursively for >>> RECORD-type fields, so you end up with nested TableRows in the PCollection >>> returned from BigQueryIO.read. But then the TableRowJsonCoder uses a >>> Jackson ObjectMapper, which makes different decisions as to what map type >>> to use. >>> >>> > Thanks for explaining. Is it documented somewhere that TableRow >>> contains Map<String, Object>? >>> >>> I don't see that explicitly spelled out anywhere. If you follow the >>> trail of links from TableRow, you'll get to these docs about Google's JSON >>> handling in Java, which may or may not be relevant to this question: >>> >>> https://googleapis.github.io/google-http-java-client/json.html >>> >>> >>> >>> On Thu, Jul 9, 2020 at 10:02 AM Kirill Zhdanovich <kzhdanov...@gmail.com> >>> wrote: >>> >>>> Thanks for explaining. Is it documented somewhere that TableRow >>>> contains Map<String, Object>? >>>> I don't construct it, I fetch from Google Analytics export to BigQuery >>>> table. >>>> >>>> On Thu, 9 Jul 2020 at 16:40, Jeff Klukas <jklu...@mozilla.com> wrote: >>>> >>>>> I would expect the following line to fail: >>>>> >>>>> List<TableRow> h = ((List<TableRow>) bigQueryRow.get("hits")); >>>>> >>>>> The top-level bigQueryRow will be a TableRow, but >>>>> `bigQueryRow.get("hits")` is only guaranteed to be an instance of some >>>>> class that implements `Map`. So that line needs to become: >>>>> >>>>> List<Map<String, Object> h = ((List<Map<String, Object>) >>>>> bigQueryRow.get("hits")); >>>>> >>>>> And then your constructor for Hit must accept a Map<String, Object> >>>>> rather than a TableRow. >>>>> >>>>> I imagine that TableRow is only intended to be used as a top-level >>>>> object. Each row you get from a BQ result is a TableRow, but objects >>>>> nested >>>>> inside it are not logically table rows; they're BQ structs that are >>>>> modeled >>>>> as maps in JSON and Map<String, Object> in Java. >>>>> >>>>> Are you manually constructing TableRow objects with nested TableRows? >>>>> I would expect that a result from BigQueryIO.read() would give a TableRow >>>>> with some other map type for nested structs, so I'm surprised that this >>>>> cast works in some contexts. >>>>> >>>>> On Wed, Jul 8, 2020 at 5:44 PM Kirill Zhdanovich < >>>>> kzhdanov...@gmail.com> wrote: >>>>> >>>>>> I changed code a little bit not to use lambdas. >>>>>> >>>>>> Session(TableRow bigQueryRow, ProductCatalog productCatalog) { >>>>>> List<TableRow> h = ((List<TableRow>) bigQueryRow.get("hits")); >>>>>> List<Hit> hits = new ArrayList<>(); >>>>>> >>>>>> for (TableRow tableRow : h) { <-- breaks here >>>>>> hits.add(new Hit(tableRow)); >>>>>> } >>>>>> ... >>>>>> } >>>>>> >>>>>> Stack trace >>>>>> >>>>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be >>>>>> cast to class com.google.api.services.bigquery.model.TableRow >>>>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; >>>>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of >>>>>> loader 'app') >>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >>>>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be >>>>>> cast >>>>>> to class com.google.api.services.bigquery.model.TableRow >>>>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; >>>>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of >>>>>> loader 'app') >>>>>> at >>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348) >>>>>> at >>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318) >>>>>> at >>>>>> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213) >>>>>> at >>>>>> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) >>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315) >>>>>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350) >>>>>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331) >>>>>> at >>>>>> com.ikea.search.ab.bootstrap.JobTest.testNumberOfSessionsIsCorrect(JobTest.java:78) >>>>>> at >>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>>>>> Method) >>>>>> at >>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>> at >>>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >>>>>> at >>>>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) >>>>>> at >>>>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >>>>>> at >>>>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) >>>>>> at >>>>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) >>>>>> at >>>>>> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319) >>>>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) >>>>>> at >>>>>> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) >>>>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) >>>>>> at >>>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) >>>>>> at >>>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) >>>>>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) >>>>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) >>>>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) >>>>>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) >>>>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) >>>>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) >>>>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:413) >>>>>> at >>>>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) >>>>>> at >>>>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) >>>>>> at >>>>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) >>>>>> at >>>>>> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) >>>>>> at >>>>>> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) >>>>>> at >>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>>>>> Method) >>>>>> at >>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>> at >>>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >>>>>> at >>>>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) >>>>>> at >>>>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) >>>>>> at >>>>>> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) >>>>>> at >>>>>> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) >>>>>> at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) >>>>>> at >>>>>> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118) >>>>>> at >>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >>>>>> Method) >>>>>> at >>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>> at >>>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >>>>>> at >>>>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) >>>>>> at >>>>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) >>>>>> at >>>>>> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175) >>>>>> at >>>>>> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157) >>>>>> at >>>>>> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404) >>>>>> at >>>>>> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63) >>>>>> at >>>>>> org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46) >>>>>> at >>>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) >>>>>> at >>>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >>>>>> at >>>>>> org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55) >>>>>> at java.base/java.lang.Thread.run(Thread.java:834) >>>>>> Caused by: java.lang.ClassCastException: class >>>>>> java.util.LinkedHashMap cannot be cast to class >>>>>> com.google.api.services.bigquery.model.TableRow (java.util.LinkedHashMap >>>>>> is >>>>>> in module java.base of loader 'bootstrap'; >>>>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of >>>>>> loader 'app') >>>>>> at com.ikea.search.ab.bootstrap.Session.<init>(Session.java:35) >>>>>> at >>>>>> com.ikea.search.ab.bootstrap.Job$CreateSessionMetrics.processElement(Job.java:82) >>>>>> >>>>>> On Wed, 8 Jul 2020 at 23:59, Jeff Klukas <jklu...@mozilla.com> wrote: >>>>>> >>>>>>> Does the stack trace tell you where specifically in the code the >>>>>>> cast is happening? I'm guessing there may be assumptions inside your >>>>>>> CreateSessionMetrics class if that's where you're manipulating the >>>>>>> TableRow >>>>>>> objects. >>>>>>> >>>>>>> On Wed, Jul 8, 2020 at 4:44 PM Kirill Zhdanovich < >>>>>>> kzhdanov...@gmail.com> wrote: >>>>>>> >>>>>>>> Interesting. All my code does is following: >>>>>>>> >>>>>>>> public static void main(String[] args) { >>>>>>>> PCollection<TableRow> bqResult = >>>>>>>> p.apply(BigQueryIO.readTableRows().fromQuery(query).usingStandardSql()); >>>>>>>> PCollection<SomeClass> result = runJob(bqResult, boolean and >>>>>>>> string params); >>>>>>>> // store results >>>>>>>> } >>>>>>>> >>>>>>>> and >>>>>>>> >>>>>>>> private static PCollection<SomeClass> runJob(PCollection<TableRow> >>>>>>>> bqResult, >>>>>>>> >>>>>>>> ...) { >>>>>>>> return bqResult >>>>>>>> // In this step I convert TableRow into my custom >>>>>>>> class object >>>>>>>> .apply("Create metrics based on sessions", >>>>>>>> ParDo.of(new CreateSessionMetrics(boolean >>>>>>>> and string params))) >>>>>>>> // few more transformations >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> This is basically similar to examples you can find here >>>>>>>> https://beam.apache.org/documentation/io/built-in/google-bigquery/ >>>>>>>> >>>>>>>> On Wed, 8 Jul 2020 at 23:31, Jeff Klukas <jklu...@mozilla.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> On Wed, Jul 8, 2020 at 3:54 PM Kirill Zhdanovich < >>>>>>>>> kzhdanov...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> So from what I understand, it works like this by design and it's >>>>>>>>>> not possible to test my code with the current coder implementation. >>>>>>>>>> Is that >>>>>>>>>> correct? >>>>>>>>>> >>>>>>>>> >>>>>>>>> I would argue that this test failure is indicating an area of >>>>>>>>> potential failure in your code that should be addressed. It may be >>>>>>>>> that >>>>>>>>> your current production pipeline relies on fusion which is not >>>>>>>>> guaranteed >>>>>>>>> by the Beam model, and so the pipeline could fail if the runner makes >>>>>>>>> an >>>>>>>>> internal change that affect fusion (in practice this is unlikely). >>>>>>>>> >>>>>>>>> Is it possible to update your code such that it does not need to >>>>>>>>> make assumptions about the concrete Map type returned by TableRow >>>>>>>>> objects? >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best Regards, >>>>>>>> Kirill >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Best Regards, >>>>>> Kirill >>>>>> >>>>> >>>> >>>> -- >>>> Best Regards, >>>> Kirill >>>> >>> >> >> -- >> Best Regards, >> Kirill >> > -- Best Regards, Kirill