Hi Praneeth, It does look like a failure constructing the serializer. Can you share the serialization config you use for the Kafka producer? In particular, are you using a custom serializer? Do you use any custom classloading configuration?
Best regards, Nico On Wed, Sep 1, 2021 at 11:38 PM Praneeth Ramesh <sr.prane...@gmail.com> wrote: > Hi All > > I am trying to run a flink scala application which reads from kafka apply > some lookup transformations and then writes to kafka. > > I am using Flink Version 1.12.1 > > I tested it in local and it works fine. But when I try to run it on > cluster using native kubernetes integration I see weird errors like below. > > The cluster also looks fine, because I tried to run a wordcount > application on the cluster and it worked fine. > > The exception is not clear and also the stacktrace shows the taskmanager > stack trace and hence no idea where in the application the problem could > be. Could this be a serialization issue? Is there a way to debug such > issues and find the actual point in application code where there is a > problem? > > ```org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not > instantiate serializer. > at > org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:216) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createStreamOutput(OperatorChain.java:664) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainOutputs(OperatorChain.java:250) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:160) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:533) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > [flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > [flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at java.lang.Thread.run(Unknown Source) [?:?] > Caused by: java.io.IOException: unexpected exception type > at java.io.ObjectStreamClass.throwMiscException(Unknown Source) > ~[?:?] > at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) > ~[?:?] > at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] > at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) > ~[?:?] > at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] > at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?] > at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] > at java.io.ObjectInputStream.defaultReadFields(Unknown Source) > ~[?:?] > at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] > at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) > ~[?:?] > at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] > at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] > at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > ... 8 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.ClassNotFoundException: > __wrapper$1$7aa8fcbe22114421a688e120fcde1df7.__wrapper$1$7aa8fcbe22114421a688e120fcde1df7$ > at > org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.shaded.guava18.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:137) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.waitForValue(LocalCache.java:3557) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.waitForLoadingValue(LocalCache.java:2302) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2289) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:72) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown > Source) ~[?:?] > at > jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) > ~[?:?] > at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?] > at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) > ~[?:?] > at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] > at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) > ~[?:?] > at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] > at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?] > at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] > at java.io.ObjectInputStream.defaultReadFields(Unknown Source) > ~[?:?] > at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] > at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) > ~[?:?] > at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] > at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] > at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > ... 8 more > Caused by: java.lang.ClassNotFoundException: > __wrapper$1$7aa8fcbe22114421a688e120fcde1df7.__wrapper$1$7aa8fcbe22114421a688e120fcde1df7$ > at > scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:64) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] > at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] > at java.lang.Class.forName0(Native Method) ~[?:?] > at java.lang.Class.forName(Unknown Source) ~[?:?] > at > scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$ToolBoxGlobal.compile(ToolBoxFactory.scala:261) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.$anonfun$compile$13(ToolBoxFactory.scala:433) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$withCompilerApi$.apply(ToolBoxFactory.scala:359) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.compile(ToolBoxFactory.scala:426) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.compileCbfInternal(TraversableSerializer.scala:230) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.call(TraversableSerializer.scala:220) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.call(TraversableSerializer.scala:216) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:72) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown > Source) ~[?:?] > at > jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) > ~[?:?] > at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?] > at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) > ~[?:?] > at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] > at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) > ~[?:?] > at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] > at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?] > at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] > at java.io.ObjectInputStream.defaultReadFields(Unknown Source) > ~[?:?] > at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?] > at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) > ~[?:?] > at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?] > at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] > at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?] > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > at > org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214) > ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT] > ... 8 more``` > >