It looks like the fact that your pipeline in production produces nested
TableRows is an artifact of the following decision within BigQueryIO logic:

https://github.com/apache/beam/blob/1a260cd70117b77e33442f0be8f213363a2db5fc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java#L348-L350

The convertGenericRecordToTableRow function is used recursively for
RECORD-type fields, so you end up with nested TableRows in the PCollection
returned from BigQueryIO.read. But then the TableRowJsonCoder uses a
Jackson ObjectMapper, which makes different decisions as to what map type
to use.

> Thanks for explaining. Is it documented somewhere that TableRow contains
Map<String, Object>?

I don't see that explicitly spelled out anywhere. If you follow the trail
of links from TableRow, you'll get to these docs about Google's JSON
handling in Java, which may or may not be relevant to this question:

https://googleapis.github.io/google-http-java-client/json.html



On Thu, Jul 9, 2020 at 10:02 AM Kirill Zhdanovich <kzhdanov...@gmail.com>
wrote:

> Thanks for explaining. Is it documented somewhere that TableRow contains
> Map<String, Object>?
> I don't construct it, I fetch from Google Analytics export to BigQuery
> table.
>
> On Thu, 9 Jul 2020 at 16:40, Jeff Klukas <jklu...@mozilla.com> wrote:
>
>> I would expect the following line to fail:
>>
>>         List<TableRow> h = ((List<TableRow>) bigQueryRow.get("hits"));
>>
>> The top-level bigQueryRow will be a TableRow, but
>> `bigQueryRow.get("hits")` is only guaranteed to be an instance of some
>> class that implements `Map`. So that line needs to become:
>>
>>         List<Map<String, Object> h = ((List<Map<String, Object>)
>> bigQueryRow.get("hits"));
>>
>> And then your constructor for Hit must accept a Map<String, Object>
>> rather than a TableRow.
>>
>> I imagine that TableRow is only intended to be used as a top-level
>> object. Each row you get from a BQ result is a TableRow, but objects nested
>> inside it are not logically table rows; they're BQ structs that are modeled
>> as maps in JSON and Map<String, Object> in Java.
>>
>> Are you manually constructing TableRow objects with nested TableRows? I
>> would expect that a result from BigQueryIO.read() would give a TableRow
>> with some other map type for nested structs, so I'm surprised that this
>> cast works in some contexts.
>>
>> On Wed, Jul 8, 2020 at 5:44 PM Kirill Zhdanovich <kzhdanov...@gmail.com>
>> wrote:
>>
>>>    I changed code a little bit not to use lambdas.
>>>
>>>    Session(TableRow bigQueryRow, ProductCatalog productCatalog) {
>>>         List<TableRow> h = ((List<TableRow>) bigQueryRow.get("hits"));
>>>         List<Hit> hits = new ArrayList<>();
>>>
>>>         for (TableRow tableRow : h) { <-- breaks here
>>>             hits.add(new Hit(tableRow));
>>>         }
>>>         ...
>>>     }
>>>
>>> Stack trace
>>>
>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be
>>> cast to class com.google.api.services.bigquery.model.TableRow
>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>> loader 'app')
>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast
>>> to class com.google.api.services.bigquery.model.TableRow
>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>> loader 'app')
>>> at
>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
>>> at
>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
>>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
>>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
>>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350)
>>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331)
>>> at
>>> com.ikea.search.ab.bootstrap.JobTest.testNumberOfSessionsIsCorrect(JobTest.java:78)
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method)
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>> at
>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>>> at
>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>> at
>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>>> at
>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>> at
>>> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>>> at
>>> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>>> at
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>>> at
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>>> at
>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>>> at
>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>>> at
>>> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>>> at
>>> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>>> at
>>> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method)
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>> at
>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>>> at
>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>>> at
>>> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>>> at
>>> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>>> at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>>> at
>>> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:118)
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>>> Method)
>>> at
>>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>> at
>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>>> at
>>> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>>> at
>>> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
>>> at
>>> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)
>>> at
>>> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
>>> at
>>> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
>>> at
>>> org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
>>> at
>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>> at
>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>> at
>>> org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
>>> at java.base/java.lang.Thread.run(Thread.java:834)
>>> Caused by: java.lang.ClassCastException: class java.util.LinkedHashMap
>>> cannot be cast to class com.google.api.services.bigquery.model.TableRow
>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>> loader 'app')
>>> at com.ikea.search.ab.bootstrap.Session.<init>(Session.java:35)
>>> at
>>> com.ikea.search.ab.bootstrap.Job$CreateSessionMetrics.processElement(Job.java:82)
>>>
>>> On Wed, 8 Jul 2020 at 23:59, Jeff Klukas <jklu...@mozilla.com> wrote:
>>>
>>>> Does the stack trace tell you where specifically in the code the cast
>>>> is happening? I'm guessing there may be assumptions inside your
>>>> CreateSessionMetrics class if that's where you're manipulating the TableRow
>>>> objects.
>>>>
>>>> On Wed, Jul 8, 2020 at 4:44 PM Kirill Zhdanovich <kzhdanov...@gmail.com>
>>>> wrote:
>>>>
>>>>> Interesting. All my code does is following:
>>>>>
>>>>> public static void main(String[] args) {
>>>>>     PCollection<TableRow> bqResult =
>>>>> p.apply(BigQueryIO.readTableRows().fromQuery(query).usingStandardSql());
>>>>>     PCollection<SomeClass> result = runJob(bqResult, boolean and
>>>>> string params);
>>>>>     // store results
>>>>> }
>>>>>
>>>>> and
>>>>>
>>>>> private static PCollection<SomeClass> runJob(PCollection<TableRow>
>>>>> bqResult,
>>>>>
>>>>>       ...) {
>>>>>         return bqResult
>>>>>                 // In this step I convert TableRow into my custom
>>>>> class object
>>>>>                 .apply("Create metrics based on sessions",
>>>>>                         ParDo.of(new CreateSessionMetrics(boolean and
>>>>> string params)))
>>>>>                // few more transformations
>>>>>
>>>>> }
>>>>>
>>>>> This is basically similar to examples you can find here
>>>>> https://beam.apache.org/documentation/io/built-in/google-bigquery/
>>>>>
>>>>> On Wed, 8 Jul 2020 at 23:31, Jeff Klukas <jklu...@mozilla.com> wrote:
>>>>>
>>>>>> On Wed, Jul 8, 2020 at 3:54 PM Kirill Zhdanovich <
>>>>>> kzhdanov...@gmail.com> wrote:
>>>>>>
>>>>>>> So from what I understand, it works like this by design and it's not
>>>>>>> possible to test my code with the current coder implementation. Is that
>>>>>>> correct?
>>>>>>>
>>>>>>
>>>>>> I would argue that this test failure is indicating an area of
>>>>>> potential failure in your code that should be addressed. It may be that
>>>>>> your current production pipeline relies on fusion which is not guaranteed
>>>>>> by the Beam model, and so the pipeline could fail if the runner makes an
>>>>>> internal change that affect fusion (in practice this is unlikely).
>>>>>>
>>>>>> Is it possible to update your code such that it does not need to make
>>>>>> assumptions about the concrete Map type returned by TableRow objects?
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards,
>>>>> Kirill
>>>>>
>>>>
>>>
>>> --
>>> Best Regards,
>>> Kirill
>>>
>>
>
> --
> Best Regards,
> Kirill
>

Reply via email to