[ https://issues.apache.org/jira/browse/FLINK-16289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Gary Yao updated FLINK-16289: ----------------------------- Component/s: (was: Deployment / Kubernetes) > Missing serialVersionUID blocks running SQL group by in Kubernetes. > ------------------------------------------------------------------- > > Key: FLINK-16289 > URL: https://issues.apache.org/jira/browse/FLINK-16289 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.10.0 > Reporter: Niels Basjes > Priority: Major > > I have written a Flink 1.10 job that reads a file (using the S3 Presto > client), applies an SQL statement on that (with > [Yauaa|https://yauaa.basjes.nl/UDF-ApacheFlinkTable.html] as a UDF) and then > tries to write it to ElasticSearch. > The problem is that when submit this into the native Kubernetes cluster I get > this exception (full stack trace below): > {code:java}java.io.InvalidClassException: > org.apache.flink.table.codegen.GeneratedAggregationsFunction; local class > incompatible: stream classdesc serialVersionUID = 1538379512770243128, local > class serialVersionUID = -5485442333060060467 {code} > According to [this stack overflow > answer|https://stackoverflow.com/a/10378907/114196] page this error stems > from the JVM automatically generating a serialVersionUID in case it is > missing, which can be JDK/JRE version dependent. > On my local machine (Ubuntu 16.04 LTS) I use the openjdk-9-jdk. > Apparently Flink docker image uses JRE 1.8 > {code} > ....KubernetesTaskExecutorRunner - > -------------------------------------------------------------------------------- > ....KubernetesTaskExecutorRunner - Starting Kubernetes TaskExecutor runner > (Version: 1.10.0, Rev:aa4eb8f, Date:07.02.2020 @ 19:18:19 CET) > ....KubernetesTaskExecutorRunner - OS current user: root > ....KubernetesTaskExecutorRunner - Current Hadoop/Kerberos user: <no hadoop > dependency found> > ....KubernetesTaskExecutorRunner - JVM: OpenJDK 64-Bit Server VM - Oracle > Corporation - 1.8/25.242-b08 > {code} > I have tried doing the same with JDK 1.8 on my machine but that still does > not work (apperently there is still a too big a difference in the Java > versions). > When I remove the "group by" (i.e. the Aggregation) from my SQL statement > this passes (and right now fails on missing dependencies ... different > problem). > > NOTE: When I run this code locally within IntelliJ it works (if you ignore > that I need to include a lot of dependencies) as intended (i.e. connect to > S3, read a file, do the SQL and write the expected result into ElasticSearch). > {code:java} > 2020-02-26 11:40:48,303 INFO org.apache.flink.runtime.taskmanager.Task - > groupBy: (useragent, DeviceClass, AgentNameVersionMajor), window: > (TumblingGroupWindow('w$, 'EventTime, 3600000.millis)), select: (useragent, > DeviceClass, AgentNameVersionMajor, SUM(clicks) AS clicks, SUM(visitors) AS > visitors, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS > w$rowtime, proctime('w$) AS w$proctime) -> select: (w$start AS wStart, > useragent, DeviceClass, AgentNameVersionMajor, clicks, visitors) -> to: Row > -> Sink: Unnamed (11/32) (db5cb408a1b286e705a2e3e30ac8131e) switched from > RUNNING to FAILED. > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot > instantiate user function. > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:269) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:115) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.InvalidClassException: > org.apache.flink.table.codegen.GeneratedAggregationsFunction; local class > incompatible: stream classdesc serialVersionUID = 1538379512770243128, local > class serialVersionUID = -5485442333060060467 > at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699) > at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1940) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1806) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2097) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2342) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2266) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2124) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2342) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2266) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2124) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2342) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2266) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2124) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2342) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2266) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2124) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) > at > org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:254) > ... 6 more > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)