[ 
https://issues.apache.org/jira/browse/FLINK-12303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16841408#comment-16841408
 ] 

Matěj Novotný commented on FLINK-12303:
---------------------------------------

[~aljoscha], thank you for showing me Your log4j configuration. Ours wasn't 
complex enough.
 I know that Scala 2.12 changes lambdas compilation to Java 8 native lambdas 
and I understand that it is not easy to work with these new lambdas.

I still think that current Flink behaviour is not perfect. When I execute my 
test environment without catching exceptions:
{code:java}
Try(env.execute()){code}
->
{code:java}
env.execute(){code}
I will get quite meaningless exception:
{code:java}
 Job execution failed.
 org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
 at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.$anonfun$applyOrElse$64(JobManager.scala:900)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
 at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658)
 at scala.util.Success.$anonfun$map$1(Try.scala:255)
 at scala.util.Success.map(Try.scala:213)
 at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
 at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
 at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
 at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
 at 
akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
 at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:85)
 at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
 at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
 at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
 at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
 at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
 at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
 at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
 Caused by: java.lang.NullPointerException
 at com.esotericsoftware.kryo.Kryo.getSerializer(Kryo.java:476)
 at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
 at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
 at 
org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
 at 
org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:33)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
 at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
 at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
 at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestampAndPunctuatedWatermark(AbstractFetcher.java:459)
 at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:404)
 at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)
 at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)
 at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
 at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
 at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
 at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
 at java.lang.Thread.run(Thread.java:748)
{code}
I would like to see that code producing this exception does not compile or at 
least exception what will inform me that it is caused by lambdas.

> Scala 2.12 lambdas does not work in event classes inside streams.
> -----------------------------------------------------------------
>
>                 Key: FLINK-12303
>                 URL: https://issues.apache.org/jira/browse/FLINK-12303
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream, API / Scala
>    Affects Versions: 1.7.2
>         Environment: Scala 2.11/2.12, Oracle Java 1.8.0_172
>            Reporter: Matěj Novotný
>            Priority: Major
>
> When you use lambdas inside event classes used in streams it does work in 
> Scala 2.11. It stoped working in Scala 2.12. It does compile but does not 
> process any data and does not throw any exception. I would expect that it 
> would not compile in case I have used some not supported field in event class 
> or I would throw some exception at least.
>  
> For more detail check my demonstration repo, please: 
> [https://github.com/matej-novotny/flink-lambda-bug]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to