[ 
https://issues.apache.org/jira/browse/FLINK-12303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16841258#comment-16841258
 ] 

Aljoscha Krettek commented on FLINK-12303:
------------------------------------------

Hi [~snilard], yes you're right, this wasn't the real issue. I'm sorry about 
that.

The real problem is that Scala 2.12 changes how Scala Lambdas are implemented 
(they now use the same underlying mechanism as Java Lambdas, which is available 
from Java 8 an onwards). It used to be that Kryo could serialize Scala Lambdas, 
which is used as a serializer in your test cases. It can't do that anymore for 
Scala 2.12 lambdas.

You can see exception if you enable logging, for example by adding these 
dependencies to your {{build.sbt}}:
{code}
  "org.slf4j" % "slf4j-log4j12" % "1.7.7" % "runtime",
  "log4j" % "log4j" % "1.2.17" % "runtime"
{code}

and by putting a {{log4j.properties}} in {{src/resources}}:
{code}
log4j.rootLogger=INFO, console

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n
{code}

This is the exception you will then find in the logs:
{code}
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
lambda (demo.LambdaCaseClass)
lambda (demo.TsEventClass)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
        at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
        at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:224)
        at 
org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
        at 
org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:33)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
        at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPunctuatedWatermark(AbstractFetcher.java:459)
        at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:404)
        at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
        at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at 
com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80)
        at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
        at 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:57)
        ... 26 more


{code}

> Scala 2.12 lambdas does not work in event classes inside streams.
> -----------------------------------------------------------------
>
>                 Key: FLINK-12303
>                 URL: https://issues.apache.org/jira/browse/FLINK-12303
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream, API / Scala
>    Affects Versions: 1.7.2
>         Environment: Scala 2.11/2.12, Oracle Java 1.8.0_172
>            Reporter: Matěj Novotný
>            Priority: Major
>
> When you use lambdas inside event classes used in streams it does work in 
> Scala 2.11. It stoped working in Scala 2.12. It does compile but does not 
> process any data and does not throw any exception. I would expect that it 
> would not compile in case I have used some not supported field in event class 
> or I would throw some exception at least.
>  
> For more detail check my demonstration repo, please: 
> [https://github.com/matej-novotny/flink-lambda-bug]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to