Re: TableRow class is not the same after serialization
Cool! Thanks a lot for your explanation and your time, Jeff, very much appreciated. On Thu, 9 Jul 2020 at 17:27, Jeff Klukas wrote: > On Thu, Jul 9, 2020 at 10:18 AM Kirill Zhdanovich > wrote: > >> So I guess I need to switch to Map instead of TableRow? >> > > Yes, I would definitely recommend that you switch to Map. > That's the most basic interface, and every deserialization of a top-level > TableRow object must provide objects matching that interface wherever the > BQ schema has a nested STRUCT/RECORD. > > Note that the latest docs for BigQueryIO do include a table that maps BQ > types to Java types, but unfortunately that table lists STRUCTs as mapping > to avro GenericRecord, which doesn't give you the info you need to > understand the Map interface inside TableRows: > > > https://beam.apache.org/releases/javadoc/2.22.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html > > You may want to file a JIRA ticket requesting more explicit documentation > about how to parse structs out of TableRow objects. > > >> On Thu, 9 Jul 2020 at 17:13, Jeff Klukas wrote: >> >>> It looks like the fact that your pipeline in production produces nested >>> TableRows is an artifact of the following decision within BigQueryIO logic: >>> >>> >>> https://github.com/apache/beam/blob/1a260cd70117b77e33442f0be8f213363a2db5fc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java#L348-L350 >>> >>> The convertGenericRecordToTableRow function is used recursively for >>> RECORD-type fields, so you end up with nested TableRows in the PCollection >>> returned from BigQueryIO.read. But then the TableRowJsonCoder uses a >>> Jackson ObjectMapper, which makes different decisions as to what map type >>> to use. >>> >>> > Thanks for explaining. Is it documented somewhere that TableRow >>> contains Map? >>> >>> I don't see that explicitly spelled out anywhere. If you follow the >>> trail of links from TableRow, you'll get to these docs about Google's JSON >>> handling in Java, which may or may not be relevant to this question: >>> >>> https://googleapis.github.io/google-http-java-client/json.html >>> >>> >>> >>> On Thu, Jul 9, 2020 at 10:02 AM Kirill Zhdanovich >>> wrote: >>> >>>> Thanks for explaining. Is it documented somewhere that TableRow >>>> contains Map? >>>> I don't construct it, I fetch from Google Analytics export to BigQuery >>>> table. >>>> >>>> On Thu, 9 Jul 2020 at 16:40, Jeff Klukas wrote: >>>> >>>>> I would expect the following line to fail: >>>>> >>>>> List h = ((List) bigQueryRow.get("hits")); >>>>> >>>>> The top-level bigQueryRow will be a TableRow, but >>>>> `bigQueryRow.get("hits")` is only guaranteed to be an instance of some >>>>> class that implements `Map`. So that line needs to become: >>>>> >>>>> List h = ((List) >>>>> bigQueryRow.get("hits")); >>>>> >>>>> And then your constructor for Hit must accept a Map >>>>> rather than a TableRow. >>>>> >>>>> I imagine that TableRow is only intended to be used as a top-level >>>>> object. Each row you get from a BQ result is a TableRow, but objects >>>>> nested >>>>> inside it are not logically table rows; they're BQ structs that are >>>>> modeled >>>>> as maps in JSON and Map in Java. >>>>> >>>>> Are you manually constructing TableRow objects with nested TableRows? >>>>> I would expect that a result from BigQueryIO.read() would give a TableRow >>>>> with some other map type for nested structs, so I'm surprised that this >>>>> cast works in some contexts. >>>>> >>>>> On Wed, Jul 8, 2020 at 5:44 PM Kirill Zhdanovich < >>>>> kzhdanov...@gmail.com> wrote: >>>>> >>>>>>I changed code a little bit not to use lambdas. >>>>>> >>>>>>Session(TableRow bigQueryRow, ProductCatalog productCatalog) { >>>>>> List h = ((List) bigQueryRow.get("hits")); >>>>>> List hits = new ArrayList<>(); >>>>>> >>>>>> for (TableRow tableRow : h) { <-- breaks here >>
Re: TableRow class is not the same after serialization
So I guess I need to switch to Map instead of TableRow? On Thu, 9 Jul 2020 at 17:13, Jeff Klukas wrote: > It looks like the fact that your pipeline in production produces nested > TableRows is an artifact of the following decision within BigQueryIO logic: > > > https://github.com/apache/beam/blob/1a260cd70117b77e33442f0be8f213363a2db5fc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java#L348-L350 > > The convertGenericRecordToTableRow function is used recursively for > RECORD-type fields, so you end up with nested TableRows in the PCollection > returned from BigQueryIO.read. But then the TableRowJsonCoder uses a > Jackson ObjectMapper, which makes different decisions as to what map type > to use. > > > Thanks for explaining. Is it documented somewhere that TableRow contains > Map? > > I don't see that explicitly spelled out anywhere. If you follow the trail > of links from TableRow, you'll get to these docs about Google's JSON > handling in Java, which may or may not be relevant to this question: > > https://googleapis.github.io/google-http-java-client/json.html > > > > On Thu, Jul 9, 2020 at 10:02 AM Kirill Zhdanovich > wrote: > >> Thanks for explaining. Is it documented somewhere that TableRow contains >> Map? >> I don't construct it, I fetch from Google Analytics export to BigQuery >> table. >> >> On Thu, 9 Jul 2020 at 16:40, Jeff Klukas wrote: >> >>> I would expect the following line to fail: >>> >>> List h = ((List) bigQueryRow.get("hits")); >>> >>> The top-level bigQueryRow will be a TableRow, but >>> `bigQueryRow.get("hits")` is only guaranteed to be an instance of some >>> class that implements `Map`. So that line needs to become: >>> >>> List h = ((List) >>> bigQueryRow.get("hits")); >>> >>> And then your constructor for Hit must accept a Map >>> rather than a TableRow. >>> >>> I imagine that TableRow is only intended to be used as a top-level >>> object. Each row you get from a BQ result is a TableRow, but objects nested >>> inside it are not logically table rows; they're BQ structs that are modeled >>> as maps in JSON and Map in Java. >>> >>> Are you manually constructing TableRow objects with nested TableRows? I >>> would expect that a result from BigQueryIO.read() would give a TableRow >>> with some other map type for nested structs, so I'm surprised that this >>> cast works in some contexts. >>> >>> On Wed, Jul 8, 2020 at 5:44 PM Kirill Zhdanovich >>> wrote: >>> >>>>I changed code a little bit not to use lambdas. >>>> >>>>Session(TableRow bigQueryRow, ProductCatalog productCatalog) { >>>> List h = ((List) bigQueryRow.get("hits")); >>>> List hits = new ArrayList<>(); >>>> >>>> for (TableRow tableRow : h) { <-- breaks here >>>> hits.add(new Hit(tableRow)); >>>> } >>>> ... >>>> } >>>> >>>> Stack trace >>>> >>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be >>>> cast to class com.google.api.services.bigquery.model.TableRow >>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; >>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of >>>> loader 'app') >>>> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast >>>> to class com.google.api.services.bigquery.model.TableRow >>>> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; >>>> com.google.api.services.bigquery.model.TableRow is in unnamed module of >>>> loader 'app') >>>> at >>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348) >>>> at >>>> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318) >>>> at >>>> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213) >>>> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) >>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315) >>>> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350) >>>> at org.apache.
Re: TableRow class is not the same after serialization
Thanks for explaining. Is it documented somewhere that TableRow contains Map? I don't construct it, I fetch from Google Analytics export to BigQuery table. On Thu, 9 Jul 2020 at 16:40, Jeff Klukas wrote: > I would expect the following line to fail: > > List h = ((List) bigQueryRow.get("hits")); > > The top-level bigQueryRow will be a TableRow, but > `bigQueryRow.get("hits")` is only guaranteed to be an instance of some > class that implements `Map`. So that line needs to become: > > List h = ((List) > bigQueryRow.get("hits")); > > And then your constructor for Hit must accept a Map rather > than a TableRow. > > I imagine that TableRow is only intended to be used as a top-level object. > Each row you get from a BQ result is a TableRow, but objects nested inside > it are not logically table rows; they're BQ structs that are modeled as > maps in JSON and Map in Java. > > Are you manually constructing TableRow objects with nested TableRows? I > would expect that a result from BigQueryIO.read() would give a TableRow > with some other map type for nested structs, so I'm surprised that this > cast works in some contexts. > > On Wed, Jul 8, 2020 at 5:44 PM Kirill Zhdanovich > wrote: > >>I changed code a little bit not to use lambdas. >> >>Session(TableRow bigQueryRow, ProductCatalog productCatalog) { >> List h = ((List) bigQueryRow.get("hits")); >> List hits = new ArrayList<>(); >> >> for (TableRow tableRow : h) { <-- breaks here >> hits.add(new Hit(tableRow)); >> } >> ... >> } >> >> Stack trace >> >> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be >> cast to class com.google.api.services.bigquery.model.TableRow >> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; >> com.google.api.services.bigquery.model.TableRow is in unnamed module of >> loader 'app') >> org.apache.beam.sdk.Pipeline$PipelineExecutionException: >> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast >> to class com.google.api.services.bigquery.model.TableRow >> (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; >> com.google.api.services.bigquery.model.TableRow is in unnamed module of >> loader 'app') >> at >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348) >> at >> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318) >> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213) >> at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) >> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315) >> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:350) >> at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:331) >> at >> com.ikea.search.ab.bootstrap.JobTest.testNumberOfSessionsIsCorrect(JobTest.java:78) >> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native >> Method) >> at >> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.base/java.lang.reflect.Method.invoke(Method.java:566) >> at >> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) >> at >> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >> at >> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) >> at >> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) >> at >> org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319) >> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) >> at >> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) >> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) >> at >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) >> at >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) >> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) >> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) >> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) >> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) >>
Re: TableRow class is not the same after serialization
va:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:175) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:157) at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404) at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63) at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class com.google.api.services.bigquery.model.TableRow (java.util.LinkedHashMap is in module java.base of loader 'bootstrap'; com.google.api.services.bigquery.model.TableRow is in unnamed module of loader 'app') at com.ikea.search.ab.bootstrap.Session.(Session.java:35) at com.ikea.search.ab.bootstrap.Job$CreateSessionMetrics.processElement(Job.java:82) On Wed, 8 Jul 2020 at 23:59, Jeff Klukas wrote: > Does the stack trace tell you where specifically in the code the cast is > happening? I'm guessing there may be assumptions inside your > CreateSessionMetrics class if that's where you're manipulating the TableRow > objects. > > On Wed, Jul 8, 2020 at 4:44 PM Kirill Zhdanovich > wrote: > >> Interesting. All my code does is following: >> >> public static void main(String[] args) { >> PCollection bqResult = >> p.apply(BigQueryIO.readTableRows().fromQuery(query).usingStandardSql()); >> PCollection result = runJob(bqResult, boolean and string >> params); >> // store results >> } >> >> and >> >> private static PCollection runJob(PCollection >> bqResult, >> >> ...) { >> return bqResult >> // In this step I convert TableRow into my custom class >> object >> .apply("Create metrics based on sessions", >> ParDo.of(new CreateSessionMetrics(boolean and >> string params))) >>// few more transformations >> >> } >> >> This is basically similar to examples you can find here >> https://beam.apache.org/documentation/io/built-in/google-bigquery/ >> >> On Wed, 8 Jul 2020 at 23:31, Jeff Klukas wrote: >> >>> On Wed, Jul 8, 2020 at 3:54 PM Kirill Zhdanovich >>> wrote: >>> >>>> So from what I understand, it works like this by design and it's not >>>> possible to test my code with the current coder implementation. Is that >>>> correct? >>>> >>> >>> I would argue that this test failure is indicating an area of potential >>> failure in your code that should be addressed. It may be that your current >>> production pipeline relies on fusion which is not guaranteed by the Beam >>> model, and so the pipeline could fail if the runner makes an internal >>> change that affect fusion (in practice this is unlikely). >>> >>> Is it possible to update your code such that it does not need to make >>> assumptions about the concrete Map type returned by TableRow objects? >>> >> >> >> -- >> Best Regards, >> Kirill >> > -- Best Regards, Kirill
Re: Error when using @DefaultCoder(AvroCoder.class)
Cool! Thanks a lot for your help, Luke. On Wed, 8 Jul 2020 at 23:45, Luke Cwik wrote: > The problem is that CreateSessionMetrics has an instance of > com.ikea.search.ab.common.ProductCatalog$Product as part of its > serialization closure. Does it have a member variable of that type, or > refer to one that is part of its parent's class, or ...? Should it be > marked transient? > > Marking it as serializable is necessary if you want to keep it part of > that class's closure and is unrelated to the usage of @DefaultCoder. > > Here are some articles[1, 2, 3] about Java serialization and what is > captured that could help if you want additional background. > > 1: https://www.cs.unh.edu/~charpov/programming-serial-lambda.html > 2: http://www.lifeisafile.com/Serialization-in-spark/ > 3: > https://stackoverflow.com/questions/40259196/understanding-sparks-closures-and-their-serialization > > > On Wed, Jul 8, 2020 at 1:22 PM Kirill Zhdanovich > wrote: > >> Hello Luke, >> I will try to get "22 more" soon, not sure how to it though >> >> 23:11:11.339 [ERROR] [system.err] Exception in thread "main" >> java.lang.IllegalArgumentException: unable to serialize >> DoFnWithExecutionInformation{doFn=com.ikea.search.ab.bootstrap.Job$CreateSessionMetrics@444f44c5, >> mainOutputTag=Tag:400#6929f09b03d242ca>, >> sideInputMapping={}, >> schemaInformation=DoFnSchemaInformation{elementConverters=[]}} >> 23:11:11.339 [ERROR] [system.err] at >> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55) >> 23:11:11.339 [ERROR] [system.err] at >> org.apache.beam.runners.core.construction.ParDoTranslation.translateDoFn(ParDoTranslation.java:606) >> 23:11:11.339 [ERROR] [system.err] at >> org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator$1.translateDoFn(PrimitiveParDoSingleFactory.java:197) >> 23:11:11.339 [ERROR] [system.err] at >> org.apache.beam.runners.core.construction.ParDoTranslation.payloadForParDoLike(ParDoTranslation.java:738) >> 23:11:11.339 [ERROR] [system.err] at >> org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.payloadForParDoSingle(PrimitiveParDoSingleFactory.java:193) >> 23:11:11.340 [ERROR] [system.err] at >> org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.translate(PrimitiveParDoSingleFactory.java:154) >> 23:11:11.340 [ERROR] [system.err] at >> org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate(PTransformTranslation.java:417) >> 23:11:11.340 [ERROR] [system.err] at >> org.apache.beam.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:225) >> 23:11:11.340 [ERROR] [system.err] at >> org.apache.beam.runners.core.construction.SdkComponents.registerPTransform(SdkComponents.java:157) >> 23:11:11.340 [ERROR] [system.err] at >> org.apache.beam.runners.core.construction.PipelineTranslation$1.visitPrimitiveTransform(PipelineTranslation.java:87) >> 23:11:11.340 [ERROR] [system.err] at >> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665) >> 23:11:11.340 [ERROR] [system.err] at >> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) >> 23:11:11.340 [ERROR] [system.err] at >> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317) >> 23:11:11.340 [ERROR] [system.err] at >> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251) >> 23:11:11.341 [ERROR] [system.err] at >> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:460) >> 23:11:11.341 [ERROR] [system.err] at >> org.apache.beam.runners.core.construction.PipelineTranslation.toProto(PipelineTranslation.java:59) >> 23:11:11.341 [ERROR] [system.err] at >> org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:187) >> 23:11:11.341 [ERROR] [system.err] at >> org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:795) >> 23:11:11.341 [ERROR] [system.err] at >> org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:186) >> 23:11:11.341 [ERROR] [system.err] at >> org.apache.beam.sdk.Pipeline.run(Pipeline.java:315) >> 23:11:11.341 [ERROR] [system.err] at >> org.apache.beam.sdk.Pipeline.run(Pipeline.java:301) >> 23:11:11.341 [ERROR] [system.err] at >> com.ikea.search.ab.bootstrap.Job.testPipeline(Job.java:185) >> 23:11:11.341 [ERROR] [system.err] at >> com.ikea.search.ab.bootstrap.Job.main(Job.java:196) >> 23:11:11.342 [ERROR] [system.err] C
Re: TableRow class is not the same after serialization
Interesting. All my code does is following: public static void main(String[] args) { PCollection bqResult = p.apply(BigQueryIO.readTableRows().fromQuery(query).usingStandardSql()); PCollection result = runJob(bqResult, boolean and string params); // store results } and private static PCollection runJob(PCollection bqResult, ...) { return bqResult // In this step I convert TableRow into my custom class object .apply("Create metrics based on sessions", ParDo.of(new CreateSessionMetrics(boolean and string params))) // few more transformations } This is basically similar to examples you can find here https://beam.apache.org/documentation/io/built-in/google-bigquery/ On Wed, 8 Jul 2020 at 23:31, Jeff Klukas wrote: > On Wed, Jul 8, 2020 at 3:54 PM Kirill Zhdanovich > wrote: > >> So from what I understand, it works like this by design and it's not >> possible to test my code with the current coder implementation. Is that >> correct? >> > > I would argue that this test failure is indicating an area of potential > failure in your code that should be addressed. It may be that your current > production pipeline relies on fusion which is not guaranteed by the Beam > model, and so the pipeline could fail if the runner makes an internal > change that affect fusion (in practice this is unlikely). > > Is it possible to update your code such that it does not need to make > assumptions about the concrete Map type returned by TableRow objects? > -- Best Regards, Kirill
Re: Error when using @DefaultCoder(AvroCoder.class)
tStream.writeSerialData(ObjectOutputStream.java:1509) 23:11:11.343 [ERROR] [system.err] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 23:11:11.343 [ERROR] [system.err] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 23:11:11.343 [ERROR] [system.err] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 23:11:11.343 [ERROR] [system.err] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 23:11:11.343 [ERROR] [system.err] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 23:11:11.343 [ERROR] [system.err] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 23:11:11.344 [ERROR] [system.err] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 23:11:11.344 [ERROR] [system.err] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 23:11:11.344 [ERROR] [system.err] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 23:11:11.344 [ERROR] [system.err] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 23:11:11.344 [ERROR] [system.err] at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 23:11:11.344 [ERROR] [system.err] at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:51) 23:11:11.344 [ERROR] [system.err] ... 22 more 23:11:11.393 [ERROR] [org.gradle.internal.buildevents.BuildExceptionReporter] 23:11:11.393 [ERROR] [org.gradle.internal.buildevents.BuildExceptionReporter] FAILURE: Build failed with an exception. On Wed, 8 Jul 2020 at 22:58, Luke Cwik wrote: > Can you provide the full stacktrace? > > On Wed, Jul 8, 2020 at 12:33 PM Rui Wang wrote: > >> Tried some code search in Beam repo but I didn't find the exact line >> of code that throws your exception. >> >> However, I believe for Java Classes you used in primitives (ParDo, >> CombineFn) and coders, it's very likely you need to make them >> serializable (i.e. implements Serializable). >> >> >> -Rui >> >> On Wed, Jul 8, 2020 at 6:23 AM Kirill Zhdanovich >> wrote: >> > >> > Hi! >> > I'm using Apache Beam Java(2.19.0) with Dataflow. I created class and >> annotated it with DefaultCoder >> > >> > @DefaultCoder(AvroCoder.class) >> > public class ProductCatalog { >> > >> > When I trying to submit it to cluster I get an error: >> > >> > Caused by: java.io.NotSerializableException: ...common.ProductCatalog >> > >> > If I add `implements Serializable` to the class definition everything >> works fine. In the Apache Beam guide, I don't see anything about using >> implements Serializable. What I'm doing wrong? Thank you in advance for >> your help >> > -- Best Regards, Kirill
Re: TableRow class is not the same after serialization
So from what I understand, it works like this by design and it's not possible to test my code with the current coder implementation. Is that correct? On Wed, 8 Jul 2020 at 21:41, Jeff Klukas wrote: > On Wed, Jul 8, 2020 at 1:38 PM Kirill Zhdanovich > wrote: > >> So it's correct implementation of TableRow that encode(decode(a)) != a? >> > > A TableRow can contain fields of any map implementation. It makes sense to > me that once a TableRow object is serialized and deserialized, that the > coder must make a choice about a concrete Map implementation to use. > > So, no I would not expect that a decoded TableRow would contain exactly > the same objects as before encoding. But I _would_ expect that > encode(decode(a)).equals(a) in the sense that Map.equals() can determine > two maps of different types to be equal as long as both maps contain > entries that are equal to one another. > > > -- Best Regards, Kirill
Re: TableRow class is not the same after serialization
So it's correct implementation of TableRow that encode(decode(a)) != a? On Wed, 8 Jul 2020 at 19:03, Jeff Klukas wrote: > The test runner intentionally does some ugly things in order to expose > problems which might otherwise be missed. In particular, I believe the test > runner enforces coding between each transform and scrambles order of > elements whereas production pipelines will often have many transforms fused > together without serializing data. > > > I've tried TableRowJsonCoder, but seems like it converts all object > inside TableRow to LinkedHashMaps > > This is likely intended. I would expect only the top-level container to be > a TableRow and that nested maps would be some other map type. > > On Wed, Jul 8, 2020 at 10:52 AM Kirill Zhdanovich > wrote: > >> Hi Jeff, >> It's a simple pipeline that takes PCollection of TableRow which is >> selected from Google Analytics export to BigQuery. So each TableRow follows >> this scheme https://support.google.com/analytics/answer/3437719?hl=en >> I have part of the code doing casting to TableRow like this: >> >> Boolean isMobile = (Boolean) (((TableRow) >> row.get("device")).get("isMobile")); >> >> or >> >> List hits = ((List) >> row.get("hits")).stream().map(Hit::new).collect(Collectors.toList()); >> >> I don't have issues running this pipeline in production. I have this >> issue, only when I tried to write end to end test. >> Do you know if there are existing coders for TableRow that I can >> use? I've tried TableRowJsonCoder, but seems like it converts all object >> inside TableRow to LinkedHashMaps >> >> On Wed, 8 Jul 2020 at 17:30, Jeff Klukas wrote: >> >>> Kirill - Can you tell us more about what Job.runJob is doing? I would >>> not expect the Beam SDK itself to do any casting to TableRow, so is there a >>> line in your code where you're explicitly casting to TableRow? There may be >>> a point where you need to explicitly set the coder on a PCollection to >>> deserialize back to TableRow objects. >>> >>> On Wed, Jul 8, 2020 at 10:11 AM Kirill Zhdanovich >>> wrote: >>> >>>> Here is a code example: >>>> >>>> List ss = Arrays.asList(session1, session2); >>>> PCollection sessions = p.apply(Create.of(ss)); >>>> PCollection res = Job.runJob(sessions, "20200614", >>>> false, new ProductCatalog()); >>>> p.run(); >>>> >>>> >>>> On Wed, 8 Jul 2020 at 17:07, Kirill Zhdanovich >>>> wrote: >>>> >>>>> Hi, >>>>> I want to test pipeline and the input for it is PCollection of >>>>> TableRows. I've created a test, and when I run it I get an error: >>>>> >>>>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be >>>>> cast to class com.google.api.services.bigquery.model.TableRow >>>>> >>>>> Is it a known issue? Thank you in advance >>>>> >>>>> -- >>>>> Best Regards, >>>>> Kirill >>>>> >>>> >>>> >>>> -- >>>> Best Regards, >>>> Kirill >>>> >>> >> >> -- >> Best Regards, >> Kirill >> > -- Best Regards, Kirill
Re: TableRow class is not the same after serialization
Hi Jeff, It's a simple pipeline that takes PCollection of TableRow which is selected from Google Analytics export to BigQuery. So each TableRow follows this scheme https://support.google.com/analytics/answer/3437719?hl=en I have part of the code doing casting to TableRow like this: Boolean isMobile = (Boolean) (((TableRow) row.get("device")).get("isMobile")); or List hits = ((List) row.get("hits")).stream().map(Hit::new).collect(Collectors.toList()); I don't have issues running this pipeline in production. I have this issue, only when I tried to write end to end test. Do you know if there are existing coders for TableRow that I can use? I've tried TableRowJsonCoder, but seems like it converts all object inside TableRow to LinkedHashMaps On Wed, 8 Jul 2020 at 17:30, Jeff Klukas wrote: > Kirill - Can you tell us more about what Job.runJob is doing? I would not > expect the Beam SDK itself to do any casting to TableRow, so is there a > line in your code where you're explicitly casting to TableRow? There may be > a point where you need to explicitly set the coder on a PCollection to > deserialize back to TableRow objects. > > On Wed, Jul 8, 2020 at 10:11 AM Kirill Zhdanovich > wrote: > >> Here is a code example: >> >> List ss = Arrays.asList(session1, session2); >> PCollection sessions = p.apply(Create.of(ss)); >> PCollection res = Job.runJob(sessions, "20200614", >> false, new ProductCatalog()); >> p.run(); >> >> >> On Wed, 8 Jul 2020 at 17:07, Kirill Zhdanovich >> wrote: >> >>> Hi, >>> I want to test pipeline and the input for it is PCollection of >>> TableRows. I've created a test, and when I run it I get an error: >>> >>> java.lang.ClassCastException: class java.util.LinkedHashMap cannot be >>> cast to class com.google.api.services.bigquery.model.TableRow >>> >>> Is it a known issue? Thank you in advance >>> >>> -- >>> Best Regards, >>> Kirill >>> >> >> >> -- >> Best Regards, >> Kirill >> > -- Best Regards, Kirill
Re: TableRow class is not the same after serialization
Here is a code example: List ss = Arrays.asList(session1, session2); PCollection sessions = p.apply(Create.of(ss)); PCollection res = Job.runJob(sessions, "20200614", false, new ProductCatalog()); p.run(); On Wed, 8 Jul 2020 at 17:07, Kirill Zhdanovich wrote: > Hi, > I want to test pipeline and the input for it is PCollection of TableRows. > I've created a test, and when I run it I get an error: > > java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast > to class com.google.api.services.bigquery.model.TableRow > > Is it a known issue? Thank you in advance > > -- > Best Regards, > Kirill > -- Best Regards, Kirill
TableRow class is not the same after serialization
Hi, I want to test pipeline and the input for it is PCollection of TableRows. I've created a test, and when I run it I get an error: java.lang.ClassCastException: class java.util.LinkedHashMap cannot be cast to class com.google.api.services.bigquery.model.TableRow Is it a known issue? Thank you in advance -- Best Regards, Kirill
Error when using @DefaultCoder(AvroCoder.class)
Hi! I'm using Apache Beam Java(2.19.0) with Dataflow. I created class and annotated it with DefaultCoder @DefaultCoder(AvroCoder.class) public class ProductCatalog { When I trying to submit it to cluster I get an error: Caused by: java.io.NotSerializableException: ...common.ProductCatalog If I add `implements Serializable` to the class definition everything works fine. In the Apache Beam guide, I don't see anything about using implements Serializable. What I'm doing wrong? Thank you in advance for your help