On Thu, Jul 9, 2020 at 10:18 AM Kirill Zhdanovich <kzhdanov...@gmail.com>
wrote:

> So I guess I need to switch to Map<String, Object> instead of TableRow?
>

Yes, I would definitely recommend that you switch to Map<String, Object>.
That's the most basic interface, and every deserialization of a top-level
TableRow object must provide objects matching that interface wherever the
BQ schema has a nested STRUCT/RECORD.

Note that the latest docs for BigQueryIO do include a table that maps BQ
types to Java types, but unfortunately that table lists STRUCTs as mapping
to avro GenericRecord, which doesn't give you the info you need to
understand the Map<String, Object> interface inside TableRows:

https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html

You may want to file a JIRA ticket requesting more explicit documentation
about how to parse structs out of TableRow objects.


> On Thu, 9 Jul 2020 at 17:13, Jeff Klukas <jklu...@mozilla.com> wrote:
>
>> It looks like the fact that your pipeline in production produces nested
>> TableRows is an artifact of the following decision within BigQueryIO logic:
>>
>>
>> https://github.com/apache/beam/blob/1a260cd70117b77e33442f0be8f213363a2db5fc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java#L348-L350
>>
>> The convertGenericRecordToTableRow function is used recursively for
>> RECORD-type fields, so you end up with nested TableRows in the PCollection
>> returned from BigQueryIO.read. But then the TableRowJsonCoder uses a
>> Jackson ObjectMapper, which makes different decisions as to what map type
>> to use.
>>
>> > Thanks for explaining. Is it documented somewhere that TableRow
>> contains Map<String, Object>?
>>
>> I don't see that explicitly spelled out anywhere. If you follow the trail
>> of links from TableRow, you'll get to these docs about Google's JSON
>> handling in Java, which may or may not be relevant to this question:
>>
>> https://googleapis.github.io/google-http-java-client/json.html
>>
>>
>>
>> On Thu, Jul 9, 2020 at 10:02 AM Kirill Zhdanovich <kzhdanov...@gmail.com>
>> wrote:
>>
>>> Thanks for explaining. Is it documented somewhere that TableRow contains
>>> Map<String, Object>?
>>> I don't construct it, I fetch from Google Analytics export to BigQuery
>>> table.
>>>
>>> On Thu, 9 Jul 2020 at 16:40, Jeff Klukas <jklu...@mozilla.com> wrote:
>>>
>>>> I would expect the following line to fail:
>>>>
>>>>         List<TableRow> h = ((List<TableRow>) bigQueryRow.get("hits"));
>>>>
>>>> The top-level bigQueryRow will be a TableRow, but
>>>> `bigQueryRow.get("hits")` is only guaranteed to be an instance of some
>>>> class that implements `Map`. So that line needs to become:
>>>>
>>>>         List<Map<String, Object> h = ((List<Map<String, Object>)
>>>> bigQueryRow.get("hits"));
>>>>
>>>> And then your constructor for Hit must accept a Map<String, Object>
>>>> rather than a TableRow.
>>>>
>>>> I imagine that TableRow is only intended to be used as a top-level
>>>> object. Each row you get from a BQ result is a TableRow, but objects nested
>>>> inside it are not logically table rows; they're BQ structs that are modeled
>>>> as maps in JSON and Map<String, Object> in Java.
>>>>
>>>> Are you manually constructing TableRow objects with nested TableRows? I
>>>> would expect that a result from BigQueryIO.read() would give a TableRow
>>>> with some other map type for nested structs, so I'm surprised that this
>>>> cast works in some contexts.
>>>>
>>>> On Wed, Jul 8, 2020 at 5:44 PM Kirill Zhdanovich <kzhdanov...@gmail.com>
>>>> wrote:
>>>>
>>>>>    I changed code a little bit not to use lambdas.
>>>>>
>>>>>    Session(TableRow bigQueryRow, ProductCatalog productCatalog) {
>>>>>         List<TableRow> h = ((List<TableRow>) bigQueryRow.get("hits"));
>>>>>         List<Hit> hits = new ArrayList<>();
>>>>>
>>>>>         for (TableRow tableRow : h) { <-- breaks here
>>>>>             hits.add(new Hit(tableRow));
>>>>>         }
>>>>>         ...
>>>>>     }
>>>>>
>>>>> Stack trace
>>>>>
>>>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be
>>>>> cast to class com.google.api.services.bigquery.model.TableRow
>>>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>>>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>>>> loader 'app')
>>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast
>>>>> to class com.google.api.services.bigquery.model.TableRow
>>>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>>>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>>>> loader 'app')
>>>>> at
>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
>>>>> at
>>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
>>>>> at
>>>>> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
>>>>> at
>>>>> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
>>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
>>>>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350)
>>>>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331)
>>>>> at
>>>>> com.ikea.search.ab.bootstrap.JobTest.testNumberOfSessionsIsCorrect(JobTest.java:78)
>>>>> at
>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>> Method)
>>>>> at
>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at
>>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>> at
>>>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>>>>> at
>>>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>>>> at
>>>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>>>>> at
>>>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>>>> at
>>>>> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
>>>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>>>>> at
>>>>> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>>>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>>>>> at
>>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>>>>> at
>>>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>>>>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>>>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>>>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>>>>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>>>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>>>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>>>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>>>>> at
>>>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>>>>> at
>>>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>>>>> at
>>>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>>>>> at
>>>>> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>>>>> at
>>>>> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>>>>> at
>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>> Method)
>>>>> at
>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at
>>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>> at
>>>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>>>>> at
>>>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>>>>> at
>>>>> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>>>>> at
>>>>> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>>>>> at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>>>>> at
>>>>> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
>>>>> at
>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>> Method)
>>>>> at
>>>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at
>>>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>>>> at
>>>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>>>>> at
>>>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>>>>> at
>>>>> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
>>>>> at
>>>>> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)
>>>>> at
>>>>> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
>>>>> at
>>>>> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
>>>>> at
>>>>> org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
>>>>> at
>>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>>>> at
>>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>>>> at
>>>>> org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
>>>>> at java.base/java.lang.Thread.run(Thread.java:834)
>>>>> Caused by: java.lang.ClassCastException: class java.util.LinkedHashMap
>>>>> cannot be cast to class com.google.api.services.bigquery.model.TableRow
>>>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>>>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>>>> loader 'app')
>>>>> at com.ikea.search.ab.bootstrap.Session.<init>(Session.java:35)
>>>>> at
>>>>> com.ikea.search.ab.bootstrap.Job$CreateSessionMetrics.processElement(Job.java:82)
>>>>>
>>>>> On Wed, 8 Jul 2020 at 23:59, Jeff Klukas <jklu...@mozilla.com> wrote:
>>>>>
>>>>>> Does the stack trace tell you where specifically in the code the cast
>>>>>> is happening? I'm guessing there may be assumptions inside your
>>>>>> CreateSessionMetrics class if that's where you're manipulating the 
>>>>>> TableRow
>>>>>> objects.
>>>>>>
>>>>>> On Wed, Jul 8, 2020 at 4:44 PM Kirill Zhdanovich <
>>>>>> kzhdanov...@gmail.com> wrote:
>>>>>>
>>>>>>> Interesting. All my code does is following:
>>>>>>>
>>>>>>> public static void main(String[] args) {
>>>>>>>     PCollection<TableRow> bqResult =
>>>>>>> p.apply(BigQueryIO.readTableRows().fromQuery(query).usingStandardSql());
>>>>>>>     PCollection<SomeClass> result = runJob(bqResult, boolean and
>>>>>>> string params);
>>>>>>>     // store results
>>>>>>> }
>>>>>>>
>>>>>>> and
>>>>>>>
>>>>>>> private static PCollection<SomeClass> runJob(PCollection<TableRow>
>>>>>>> bqResult,
>>>>>>>
>>>>>>>         ...) {
>>>>>>>         return bqResult
>>>>>>>                 // In this step I convert TableRow into my custom
>>>>>>> class object
>>>>>>>                 .apply("Create metrics based on sessions",
>>>>>>>                         ParDo.of(new CreateSessionMetrics(boolean
>>>>>>> and string params)))
>>>>>>>                // few more transformations
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>> This is basically similar to examples you can find here
>>>>>>> https://beam.apache.org/documentation/io/built-in/google-bigquery/
>>>>>>>
>>>>>>> On Wed, 8 Jul 2020 at 23:31, Jeff Klukas <jklu...@mozilla.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On Wed, Jul 8, 2020 at 3:54 PM Kirill Zhdanovich <
>>>>>>>> kzhdanov...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> So from what I understand, it works like this by design and it's
>>>>>>>>> not possible to test my code with the current coder implementation. 
>>>>>>>>> Is that
>>>>>>>>> correct?
>>>>>>>>>
>>>>>>>>
>>>>>>>> I would argue that this test failure is indicating an area of
>>>>>>>> potential failure in your code that should be addressed. It may be that
>>>>>>>> your current production pipeline relies on fusion which is not 
>>>>>>>> guaranteed
>>>>>>>> by the Beam model, and so the pipeline could fail if the runner makes 
>>>>>>>> an
>>>>>>>> internal change that affect fusion (in practice this is unlikely).
>>>>>>>>
>>>>>>>> Is it possible to update your code such that it does not need to
>>>>>>>> make assumptions about the concrete Map type returned by TableRow 
>>>>>>>> objects?
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best Regards,
>>>>>>> Kirill
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best Regards,
>>>>> Kirill
>>>>>
>>>>
>>>
>>> --
>>> Best Regards,
>>> Kirill
>>>
>>
>
> --
> Best Regards,
> Kirill
>

Reply via email to