Re: TableRow class is not the same after serialization

2020-07-09 Thread Kirill Zhdanovich
Cool! Thanks a lot for your explanation and your time, Jeff, very much
appreciated.

On Thu, 9 Jul 2020 at 17:27, Jeff Klukas  wrote:

> On Thu, Jul 9, 2020 at 10:18 AM Kirill Zhdanovich 
> wrote:
>
>> So I guess I need to switch to Map instead of TableRow?
>>
>
> Yes, I would definitely recommend that you switch to Map.
> That's the most basic interface, and every deserialization of a top-level
> TableRow object must provide objects matching that interface wherever the
> BQ schema has a nested STRUCT/RECORD.
>
> Note that the latest docs for BigQueryIO do include a table that maps BQ
> types to Java types, but unfortunately that table lists STRUCTs as mapping
> to avro GenericRecord, which doesn't give you the info you need to
> understand the Map interface inside TableRows:
>
>
> https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html
>
> You may want to file a JIRA ticket requesting more explicit documentation
> about how to parse structs out of TableRow objects.
>
>
>> On Thu, 9 Jul 2020 at 17:13, Jeff Klukas  wrote:
>>
>>> It looks like the fact that your pipeline in production produces nested
>>> TableRows is an artifact of the following decision within BigQueryIO logic:
>>>
>>>
>>> https://github.com/apache/beam/blob/1a260cd70117b77e33442f0be8f213363a2db5fc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java#L348-L350
>>>
>>> The convertGenericRecordToTableRow function is used recursively for
>>> RECORD-type fields, so you end up with nested TableRows in the PCollection
>>> returned from BigQueryIO.read. But then the TableRowJsonCoder uses a
>>> Jackson ObjectMapper, which makes different decisions as to what map type
>>> to use.
>>>
>>> > Thanks for explaining. Is it documented somewhere that TableRow
>>> contains Map?
>>>
>>> I don't see that explicitly spelled out anywhere. If you follow the
>>> trail of links from TableRow, you'll get to these docs about Google's JSON
>>> handling in Java, which may or may not be relevant to this question:
>>>
>>> https://googleapis.github.io/google-http-java-client/json.html
>>>
>>>
>>>
>>> On Thu, Jul 9, 2020 at 10:02 AM Kirill Zhdanovich 
>>> wrote:
>>>
>>>> Thanks for explaining. Is it documented somewhere that TableRow
>>>> contains Map?
>>>> I don't construct it, I fetch from Google Analytics export to BigQuery
>>>> table.
>>>>
>>>> On Thu, 9 Jul 2020 at 16:40, Jeff Klukas  wrote:
>>>>
>>>>> I would expect the following line to fail:
>>>>>
>>>>> List h = ((List) bigQueryRow.get("hits"));
>>>>>
>>>>> The top-level bigQueryRow will be a TableRow, but
>>>>> `bigQueryRow.get("hits")` is only guaranteed to be an instance of some
>>>>> class that implements `Map`. So that line needs to become:
>>>>>
>>>>> List h = ((List)
>>>>> bigQueryRow.get("hits"));
>>>>>
>>>>> And then your constructor for Hit must accept a Map
>>>>> rather than a TableRow.
>>>>>
>>>>> I imagine that TableRow is only intended to be used as a top-level
>>>>> object. Each row you get from a BQ result is a TableRow, but objects 
>>>>> nested
>>>>> inside it are not logically table rows; they're BQ structs that are 
>>>>> modeled
>>>>> as maps in JSON and Map in Java.
>>>>>
>>>>> Are you manually constructing TableRow objects with nested TableRows?
>>>>> I would expect that a result from BigQueryIO.read() would give a TableRow
>>>>> with some other map type for nested structs, so I'm surprised that this
>>>>> cast works in some contexts.
>>>>>
>>>>> On Wed, Jul 8, 2020 at 5:44 PM Kirill Zhdanovich <
>>>>> kzhdanov...@gmail.com> wrote:
>>>>>
>>>>>>I changed code a little bit not to use lambdas.
>>>>>>
>>>>>>Session(TableRow bigQueryRow, ProductCatalog productCatalog) {
>>>>>> List h = ((List) bigQueryRow.get("hits"));
>>>>>> List hits = new ArrayList<>();
>>>>>>
>>>>>> for (TableRow tableRow : h) { <-- breaks here
>>

Re: TableRow class is not the same after serialization

2020-07-09 Thread Kirill Zhdanovich
So I guess I need to switch to Map instead of TableRow?

On Thu, 9 Jul 2020 at 17:13, Jeff Klukas  wrote:

> It looks like the fact that your pipeline in production produces nested
> TableRows is an artifact of the following decision within BigQueryIO logic:
>
>
> https://github.com/apache/beam/blob/1a260cd70117b77e33442f0be8f213363a2db5fc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java#L348-L350
>
> The convertGenericRecordToTableRow function is used recursively for
> RECORD-type fields, so you end up with nested TableRows in the PCollection
> returned from BigQueryIO.read. But then the TableRowJsonCoder uses a
> Jackson ObjectMapper, which makes different decisions as to what map type
> to use.
>
> > Thanks for explaining. Is it documented somewhere that TableRow contains
> Map?
>
> I don't see that explicitly spelled out anywhere. If you follow the trail
> of links from TableRow, you'll get to these docs about Google's JSON
> handling in Java, which may or may not be relevant to this question:
>
> https://googleapis.github.io/google-http-java-client/json.html
>
>
>
> On Thu, Jul 9, 2020 at 10:02 AM Kirill Zhdanovich 
> wrote:
>
>> Thanks for explaining. Is it documented somewhere that TableRow contains
>> Map?
>> I don't construct it, I fetch from Google Analytics export to BigQuery
>> table.
>>
>> On Thu, 9 Jul 2020 at 16:40, Jeff Klukas  wrote:
>>
>>> I would expect the following line to fail:
>>>
>>> List h = ((List) bigQueryRow.get("hits"));
>>>
>>> The top-level bigQueryRow will be a TableRow, but
>>> `bigQueryRow.get("hits")` is only guaranteed to be an instance of some
>>> class that implements `Map`. So that line needs to become:
>>>
>>> List h = ((List)
>>> bigQueryRow.get("hits"));
>>>
>>> And then your constructor for Hit must accept a Map
>>> rather than a TableRow.
>>>
>>> I imagine that TableRow is only intended to be used as a top-level
>>> object. Each row you get from a BQ result is a TableRow, but objects nested
>>> inside it are not logically table rows; they're BQ structs that are modeled
>>> as maps in JSON and Map in Java.
>>>
>>> Are you manually constructing TableRow objects with nested TableRows? I
>>> would expect that a result from BigQueryIO.read() would give a TableRow
>>> with some other map type for nested structs, so I'm surprised that this
>>> cast works in some contexts.
>>>
>>> On Wed, Jul 8, 2020 at 5:44 PM Kirill Zhdanovich 
>>> wrote:
>>>
>>>>I changed code a little bit not to use lambdas.
>>>>
>>>>Session(TableRow bigQueryRow, ProductCatalog productCatalog) {
>>>> List h = ((List) bigQueryRow.get("hits"));
>>>> List hits = new ArrayList<>();
>>>>
>>>> for (TableRow tableRow : h) { <-- breaks here
>>>> hits.add(new Hit(tableRow));
>>>> }
>>>> ...
>>>> }
>>>>
>>>> Stack trace
>>>>
>>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be
>>>> cast to class com.google.api.services.bigquery.model.TableRow
>>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>>> loader 'app')
>>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast
>>>> to class com.google.api.services.bigquery.model.TableRow
>>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>>>> loader 'app')
>>>> at
>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
>>>> at
>>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
>>>> at
>>>> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
>>>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
>>>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350)
>>>> at org.apache.

Re: TableRow class is not the same after serialization

2020-07-09 Thread Kirill Zhdanovich
Thanks for explaining. Is it documented somewhere that TableRow contains
Map?
I don't construct it, I fetch from Google Analytics export to BigQuery
table.

On Thu, 9 Jul 2020 at 16:40, Jeff Klukas  wrote:

> I would expect the following line to fail:
>
> List h = ((List) bigQueryRow.get("hits"));
>
> The top-level bigQueryRow will be a TableRow, but
> `bigQueryRow.get("hits")` is only guaranteed to be an instance of some
> class that implements `Map`. So that line needs to become:
>
> List h = ((List)
> bigQueryRow.get("hits"));
>
> And then your constructor for Hit must accept a Map rather
> than a TableRow.
>
> I imagine that TableRow is only intended to be used as a top-level object.
> Each row you get from a BQ result is a TableRow, but objects nested inside
> it are not logically table rows; they're BQ structs that are modeled as
> maps in JSON and Map in Java.
>
> Are you manually constructing TableRow objects with nested TableRows? I
> would expect that a result from BigQueryIO.read() would give a TableRow
> with some other map type for nested structs, so I'm surprised that this
> cast works in some contexts.
>
> On Wed, Jul 8, 2020 at 5:44 PM Kirill Zhdanovich 
> wrote:
>
>>I changed code a little bit not to use lambdas.
>>
>>Session(TableRow bigQueryRow, ProductCatalog productCatalog) {
>> List h = ((List) bigQueryRow.get("hits"));
>> List hits = new ArrayList<>();
>>
>> for (TableRow tableRow : h) { <-- breaks here
>> hits.add(new Hit(tableRow));
>> }
>> ...
>> }
>>
>> Stack trace
>>
>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be
>> cast to class com.google.api.services.bigquery.model.TableRow
>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>> loader 'app')
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast
>> to class com.google.api.services.bigquery.model.TableRow
>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
>> com.google.api.services.bigquery.model.TableRow is in unnamed module of
>> loader 'app')
>> at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
>> at
>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350)
>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331)
>> at
>> com.ikea.search.ab.bootstrap.JobTest.testNumberOfSessionsIsCorrect(JobTest.java:78)
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> at
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>> at
>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>> at
>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>> at
>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>> at
>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>> at
>> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>> at
>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>>

Re: TableRow class is not the same after serialization

2020-07-08 Thread Kirill Zhdanovich
va:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at
org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175)
at
org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157)
at
org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
at
org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
at
org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at
org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassCastException: class java.util.LinkedHashMap
cannot be cast to class com.google.api.services.bigquery.model.TableRow
(java.util.LinkedHashMap is in module java.base of loader 'bootstrap';
com.google.api.services.bigquery.model.TableRow is in unnamed module of
loader 'app')
at com.ikea.search.ab.bootstrap.Session.(Session.java:35)
at
com.ikea.search.ab.bootstrap.Job$CreateSessionMetrics.processElement(Job.java:82)

On Wed, 8 Jul 2020 at 23:59, Jeff Klukas  wrote:

> Does the stack trace tell you where specifically in the code the cast is
> happening? I'm guessing there may be assumptions inside your
> CreateSessionMetrics class if that's where you're manipulating the TableRow
> objects.
>
> On Wed, Jul 8, 2020 at 4:44 PM Kirill Zhdanovich 
> wrote:
>
>> Interesting. All my code does is following:
>>
>> public static void main(String[] args) {
>> PCollection bqResult =
>> p.apply(BigQueryIO.readTableRows().fromQuery(query).usingStandardSql());
>> PCollection result = runJob(bqResult, boolean and string
>> params);
>> // store results
>> }
>>
>> and
>>
>> private static PCollection runJob(PCollection
>> bqResult,
>>
>>   ...) {
>> return bqResult
>> // In this step I convert TableRow into my custom class
>> object
>> .apply("Create metrics based on sessions",
>> ParDo.of(new CreateSessionMetrics(boolean and
>> string params)))
>>// few more transformations
>>
>> }
>>
>> This is basically similar to examples you can find here
>> https://beam.apache.org/documentation/io/built-in/google-bigquery/
>>
>> On Wed, 8 Jul 2020 at 23:31, Jeff Klukas  wrote:
>>
>>> On Wed, Jul 8, 2020 at 3:54 PM Kirill Zhdanovich 
>>> wrote:
>>>
>>>> So from what I understand, it works like this by design and it's not
>>>> possible to test my code with the current coder implementation. Is that
>>>> correct?
>>>>
>>>
>>> I would argue that this test failure is indicating an area of potential
>>> failure in your code that should be addressed. It may be that your current
>>> production pipeline relies on fusion which is not guaranteed by the Beam
>>> model, and so the pipeline could fail if the runner makes an internal
>>> change that affect fusion (in practice this is unlikely).
>>>
>>> Is it possible to update your code such that it does not need to make
>>> assumptions about the concrete Map type returned by TableRow objects?
>>>
>>
>>
>> --
>> Best Regards,
>> Kirill
>>
>

-- 
Best Regards,
Kirill


Re: Error when using @DefaultCoder(AvroCoder.class)

2020-07-08 Thread Kirill Zhdanovich
Cool! Thanks a lot for your help, Luke.

On Wed, 8 Jul 2020 at 23:45, Luke Cwik  wrote:

> The problem is that CreateSessionMetrics has an instance of
> com.ikea.search.ab.common.ProductCatalog$Product as part of its
> serialization closure. Does it have a member variable of that type, or
> refer to one that is part of its parent's class, or ...? Should it be
> marked transient?
>
> Marking it as serializable is necessary if you want to keep it part of
> that class's closure and is unrelated to the usage of @DefaultCoder.
>
> Here are some articles[1, 2, 3] about Java serialization and what is
> captured that could help if you want additional background.
>
> 1: https://www.cs.unh.edu/~charpov/programming-serial-lambda.html
> 2: http://www.lifeisafile.com/Serialization-in-spark/
> 3:
> https://stackoverflow.com/questions/40259196/understanding-sparks-closures-and-their-serialization
>
>
> On Wed, Jul 8, 2020 at 1:22 PM Kirill Zhdanovich 
> wrote:
>
>> Hello Luke,
>> I will try to get "22 more" soon, not sure how to it though
>>
>> 23:11:11.339 [ERROR] [system.err] Exception in thread "main"
>> java.lang.IllegalArgumentException: unable to serialize
>> DoFnWithExecutionInformation{doFn=com.ikea.search.ab.bootstrap.Job$CreateSessionMetrics@444f44c5,
>> mainOutputTag=Tag:400#6929f09b03d242ca>,
>> sideInputMapping={},
>> schemaInformation=DoFnSchemaInformation{elementConverters=[]}}
>> 23:11:11.339 [ERROR] [system.err] at
>> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
>> 23:11:11.339 [ERROR] [system.err] at
>> org.apache.beam.runners.core.construction.ParDoTranslation.translateDoFn(ParDoTranslation.java:606)
>> 23:11:11.339 [ERROR] [system.err] at
>> org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator$1.translateDoFn(PrimitiveParDoSingleFactory.java:197)
>> 23:11:11.339 [ERROR] [system.err] at
>> org.apache.beam.runners.core.construction.ParDoTranslation.payloadForParDoLike(ParDoTranslation.java:738)
>> 23:11:11.339 [ERROR] [system.err] at
>> org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.payloadForParDoSingle(PrimitiveParDoSingleFactory.java:193)
>> 23:11:11.340 [ERROR] [system.err] at
>> org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.translate(PrimitiveParDoSingleFactory.java:154)
>> 23:11:11.340 [ERROR] [system.err] at
>> org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate(PTransformTranslation.java:417)
>> 23:11:11.340 [ERROR] [system.err] at
>> org.apache.beam.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:225)
>> 23:11:11.340 [ERROR] [system.err] at
>> org.apache.beam.runners.core.construction.SdkComponents.registerPTransform(SdkComponents.java:157)
>> 23:11:11.340 [ERROR] [system.err] at
>> org.apache.beam.runners.core.construction.PipelineTranslation$1.visitPrimitiveTransform(PipelineTranslation.java:87)
>> 23:11:11.340 [ERROR] [system.err] at
>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
>> 23:11:11.340 [ERROR] [system.err] at
>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
>> 23:11:11.340 [ERROR] [system.err] at
>> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
>> 23:11:11.340 [ERROR] [system.err] at
>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
>> 23:11:11.341 [ERROR] [system.err] at
>> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:460)
>> 23:11:11.341 [ERROR] [system.err] at
>> org.apache.beam.runners.core.construction.PipelineTranslation.toProto(PipelineTranslation.java:59)
>> 23:11:11.341 [ERROR] [system.err] at
>> org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:187)
>> 23:11:11.341 [ERROR] [system.err] at
>> org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:795)
>> 23:11:11.341 [ERROR] [system.err] at
>> org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:186)
>> 23:11:11.341 [ERROR] [system.err] at
>> org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
>> 23:11:11.341 [ERROR] [system.err] at
>> org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
>> 23:11:11.341 [ERROR] [system.err] at
>> com.ikea.search.ab.bootstrap.Job.testPipeline(Job.java:185)
>> 23:11:11.341 [ERROR] [system.err] at
>> com.ikea.search.ab.bootstrap.Job.main(Job.java:196)
>> 23:11:11.342 [ERROR] [system.err] C

Re: TableRow class is not the same after serialization

2020-07-08 Thread Kirill Zhdanovich
Interesting. All my code does is following:

public static void main(String[] args) {
PCollection bqResult =
p.apply(BigQueryIO.readTableRows().fromQuery(query).usingStandardSql());
PCollection result = runJob(bqResult, boolean and string
params);
// store results
}

and

private static PCollection runJob(PCollection bqResult,

...) {
return bqResult
// In this step I convert TableRow into my custom class
object
.apply("Create metrics based on sessions",
ParDo.of(new CreateSessionMetrics(boolean and
string params)))
   // few more transformations

}

This is basically similar to examples you can find here
https://beam.apache.org/documentation/io/built-in/google-bigquery/

On Wed, 8 Jul 2020 at 23:31, Jeff Klukas  wrote:

> On Wed, Jul 8, 2020 at 3:54 PM Kirill Zhdanovich 
> wrote:
>
>> So from what I understand, it works like this by design and it's not
>> possible to test my code with the current coder implementation. Is that
>> correct?
>>
>
> I would argue that this test failure is indicating an area of potential
> failure in your code that should be addressed. It may be that your current
> production pipeline relies on fusion which is not guaranteed by the Beam
> model, and so the pipeline could fail if the runner makes an internal
> change that affect fusion (in practice this is unlikely).
>
> Is it possible to update your code such that it does not need to make
> assumptions about the concrete Map type returned by TableRow objects?
>


-- 
Best Regards,
Kirill


Re: Error when using @DefaultCoder(AvroCoder.class)

2020-07-08 Thread Kirill Zhdanovich
tStream.writeSerialData(ObjectOutputStream.java:1509)
23:11:11.343 [ERROR] [system.err] at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
23:11:11.343 [ERROR] [system.err] at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
23:11:11.343 [ERROR] [system.err] at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
23:11:11.343 [ERROR] [system.err] at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
23:11:11.343 [ERROR] [system.err] at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
23:11:11.343 [ERROR] [system.err] at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
23:11:11.344 [ERROR] [system.err] at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
23:11:11.344 [ERROR] [system.err] at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
23:11:11.344 [ERROR] [system.err] at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
23:11:11.344 [ERROR] [system.err] at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
23:11:11.344 [ERROR] [system.err] at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
23:11:11.344 [ERROR] [system.err] at
org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:51)
23:11:11.344 [ERROR] [system.err] ... 22 more
23:11:11.393 [ERROR]
[org.gradle.internal.buildevents.BuildExceptionReporter]
23:11:11.393 [ERROR]
[org.gradle.internal.buildevents.BuildExceptionReporter] FAILURE: Build
failed with an exception.

On Wed, 8 Jul 2020 at 22:58, Luke Cwik  wrote:

> Can you provide the full stacktrace?
>
> On Wed, Jul 8, 2020 at 12:33 PM Rui Wang  wrote:
>
>> Tried some code search in Beam repo but I didn't find the exact line
>> of code that throws your exception.
>>
>> However, I believe for Java Classes you used in primitives (ParDo,
>> CombineFn) and coders, it's very likely you need to make them
>> serializable (i.e. implements Serializable).
>>
>>
>> -Rui
>>
>> On Wed, Jul 8, 2020 at 6:23 AM Kirill Zhdanovich 
>> wrote:
>> >
>> > Hi!
>> > I'm using Apache Beam Java(2.19.0) with Dataflow. I created class and
>> annotated it with DefaultCoder
>> >
>> > @DefaultCoder(AvroCoder.class)
>> > public class ProductCatalog {
>> >
>> > When I trying to submit it to cluster I get an error:
>> >
>> > Caused by: java.io.NotSerializableException: ...common.ProductCatalog
>> >
>> > If I add `implements Serializable` to the class definition everything
>> works fine. In the Apache Beam guide, I don't see anything about using
>> implements Serializable. What I'm doing wrong? Thank you in advance for
>> your help
>>
>

-- 
Best Regards,
Kirill


Re: TableRow class is not the same after serialization

2020-07-08 Thread Kirill Zhdanovich
So from what I understand, it works like this by design and it's not
possible to test my code with the current coder implementation. Is that
correct?

On Wed, 8 Jul 2020 at 21:41, Jeff Klukas  wrote:

> On Wed, Jul 8, 2020 at 1:38 PM Kirill Zhdanovich 
> wrote:
>
>> So it's correct implementation of TableRow that encode(decode(a)) != a?
>>
>
> A TableRow can contain fields of any map implementation. It makes sense to
> me that once a TableRow object is serialized and deserialized, that the
> coder must make a choice about a concrete Map implementation to use.
>
> So, no I would not expect that a decoded TableRow would contain exactly
> the same objects as before encoding. But I _would_ expect that
> encode(decode(a)).equals(a) in the sense that Map.equals() can determine
> two maps of different types to be equal as long as both maps contain
> entries that are equal to one another.
>
>
>

-- 
Best Regards,
Kirill


Re: TableRow class is not the same after serialization

2020-07-08 Thread Kirill Zhdanovich
So it's correct implementation of TableRow that encode(decode(a)) != a?

On Wed, 8 Jul 2020 at 19:03, Jeff Klukas  wrote:

> The test runner intentionally does some ugly things in order to expose
> problems which might otherwise be missed. In particular, I believe the test
> runner enforces coding between each transform and scrambles order of
> elements whereas production pipelines will often have many transforms fused
> together without serializing data.
>
> > I've tried TableRowJsonCoder, but seems like it converts all object
> inside TableRow to LinkedHashMaps
>
> This is likely intended. I would expect only the top-level container to be
> a TableRow and that nested maps would be some other map type.
>
> On Wed, Jul 8, 2020 at 10:52 AM Kirill Zhdanovich 
> wrote:
>
>> Hi Jeff,
>> It's a simple pipeline that takes PCollection of TableRow which is
>> selected from Google Analytics export to BigQuery. So each TableRow follows
>> this scheme https://support.google.com/analytics/answer/3437719?hl=en
>> I have part of the code doing casting to TableRow like this:
>>
>> Boolean isMobile = (Boolean) (((TableRow) 
>> row.get("device")).get("isMobile"));
>>
>> or
>>
>> List hits = ((List) 
>> row.get("hits")).stream().map(Hit::new).collect(Collectors.toList());
>>
>> I don't have issues running this pipeline in production. I have this
>> issue, only when I tried to write end to end test.
>> Do you know if there are existing coders for TableRow that I can
>> use? I've tried TableRowJsonCoder, but seems like it converts all object
>> inside TableRow to LinkedHashMaps
>>
>> On Wed, 8 Jul 2020 at 17:30, Jeff Klukas  wrote:
>>
>>> Kirill - Can you tell us more about what Job.runJob is doing? I would
>>> not expect the Beam SDK itself to do any casting to TableRow, so is there a
>>> line in your code where you're explicitly casting to TableRow? There may be
>>> a point where you need to explicitly set the coder on a PCollection to
>>> deserialize back to TableRow objects.
>>>
>>> On Wed, Jul 8, 2020 at 10:11 AM Kirill Zhdanovich 
>>> wrote:
>>>
>>>> Here is a code example:
>>>>
>>>> List ss = Arrays.asList(session1, session2);
>>>> PCollection sessions = p.apply(Create.of(ss));
>>>> PCollection res = Job.runJob(sessions, "20200614", 
>>>> false, new ProductCatalog());
>>>> p.run();
>>>>
>>>>
>>>> On Wed, 8 Jul 2020 at 17:07, Kirill Zhdanovich 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I want to test pipeline and the input for it is PCollection of
>>>>> TableRows. I've created a test, and when I run it I get an error:
>>>>>
>>>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be
>>>>> cast to class com.google.api.services.bigquery.model.TableRow
>>>>>
>>>>> Is it a known issue? Thank you in advance
>>>>>
>>>>> --
>>>>> Best Regards,
>>>>> Kirill
>>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Kirill
>>>>
>>>
>>
>> --
>> Best Regards,
>> Kirill
>>
>

-- 
Best Regards,
Kirill


Re: TableRow class is not the same after serialization

2020-07-08 Thread Kirill Zhdanovich
Hi Jeff,
It's a simple pipeline that takes PCollection of TableRow which is selected
from Google Analytics export to BigQuery. So each TableRow follows this
scheme https://support.google.com/analytics/answer/3437719?hl=en
I have part of the code doing casting to TableRow like this:

Boolean isMobile = (Boolean) (((TableRow) row.get("device")).get("isMobile"));

or

List hits = ((List)
row.get("hits")).stream().map(Hit::new).collect(Collectors.toList());

I don't have issues running this pipeline in production. I have this issue,
only when I tried to write end to end test.
Do you know if there are existing coders for TableRow that I can use? I've
tried TableRowJsonCoder, but seems like it converts all object inside
TableRow to LinkedHashMaps

On Wed, 8 Jul 2020 at 17:30, Jeff Klukas  wrote:

> Kirill - Can you tell us more about what Job.runJob is doing? I would not
> expect the Beam SDK itself to do any casting to TableRow, so is there a
> line in your code where you're explicitly casting to TableRow? There may be
> a point where you need to explicitly set the coder on a PCollection to
> deserialize back to TableRow objects.
>
> On Wed, Jul 8, 2020 at 10:11 AM Kirill Zhdanovich 
> wrote:
>
>> Here is a code example:
>>
>> List ss = Arrays.asList(session1, session2);
>> PCollection sessions = p.apply(Create.of(ss));
>> PCollection res = Job.runJob(sessions, "20200614", 
>> false, new ProductCatalog());
>> p.run();
>>
>>
>> On Wed, 8 Jul 2020 at 17:07, Kirill Zhdanovich 
>> wrote:
>>
>>> Hi,
>>> I want to test pipeline and the input for it is PCollection of
>>> TableRows. I've created a test, and when I run it I get an error:
>>>
>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be
>>> cast to class com.google.api.services.bigquery.model.TableRow
>>>
>>> Is it a known issue? Thank you in advance
>>>
>>> --
>>> Best Regards,
>>> Kirill
>>>
>>
>>
>> --
>> Best Regards,
>> Kirill
>>
>

-- 
Best Regards,
Kirill


Re: TableRow class is not the same after serialization

2020-07-08 Thread Kirill Zhdanovich
Here is a code example:

List ss = Arrays.asList(session1, session2);
PCollection sessions = p.apply(Create.of(ss));
PCollection res = Job.runJob(sessions,
"20200614", false, new ProductCatalog());
p.run();


On Wed, 8 Jul 2020 at 17:07, Kirill Zhdanovich 
wrote:

> Hi,
> I want to test pipeline and the input for it is PCollection of TableRows.
> I've created a test, and when I run it I get an error:
>
> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast
> to class com.google.api.services.bigquery.model.TableRow
>
> Is it a known issue? Thank you in advance
>
> --
> Best Regards,
> Kirill
>


-- 
Best Regards,
Kirill


TableRow class is not the same after serialization

2020-07-08 Thread Kirill Zhdanovich
Hi,
I want to test pipeline and the input for it is PCollection of TableRows.
I've created a test, and when I run it I get an error:

java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast
to class com.google.api.services.bigquery.model.TableRow

Is it a known issue? Thank you in advance

-- 
Best Regards,
Kirill


Error when using @DefaultCoder(AvroCoder.class)

2020-07-08 Thread Kirill Zhdanovich
Hi!
I'm using Apache Beam Java(2.19.0) with Dataflow. I created class and
annotated it with DefaultCoder

@DefaultCoder(AvroCoder.class)
public class ProductCatalog {

When I trying to submit it to cluster I get an error:

Caused by: java.io.NotSerializableException: ...common.ProductCatalog

If I add `implements Serializable` to the class definition everything works
fine. In the Apache Beam guide, I don't see anything about using implements
Serializable. What I'm doing wrong? Thank you in advance for your help