[ https://issues.apache.org/jira/browse/FLINK-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alexander Chermenin updated FLINK-5319: --------------------------------------- Description: Code sample: {code}static abstract class A { int id; A(int id) {this.id = id; } int getId() { return id; } } static class B extends A { B(int id) { super(id % 3); } } static class C extends A { C(int id) { super(id % 2); } } private static B b(int id) { return new B(id); } private static C c(int id) { return new C(id); } /** * Main method. */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); B[] bs = IntStream.range(0, 10).mapToObj(Test::b).toArray(B[]::new); C[] cs = IntStream.range(0, 10).mapToObj(Test::c).toArray(C[]::new); DataStreamSource<B> bStream = environment.fromElements(bs); DataStreamSource<C> cStream = environment.fromElements(cs); bStream.keyBy((KeySelector<B, Integer>) A::getId).print(); cStream.keyBy((KeySelector<C, Integer>) A::getId).print(); environment.execute(); } {code} This code throws next exception: {code}Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:901) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: Could not extract key from org.sample.flink.examples.Test$C@5e1a8111 at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:75) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:746) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:724) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:84) at org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:127) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:75) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Could not extract key from org.sample.flink.examples.Test$C@5e1a8111 at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61) at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:72) ... 11 more Caused by: java.lang.ClassCastException: org.sample.flink.examples.Test$C cannot be cast to org.sample.flink.examples.Test$B at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:59) ... 15 more{code} This problem occurs when we use method reference as KeySelector. And there are no problems when we use anonymous class or lambda. was: Code sample: {code}static abstract class A { int id; A(int id) {this.id = id; } int getId() { return id; } } static class B extends A { B(int id) { super(id % 3); } } static class C extends A { C(int id) { super(id % 2); } } private static B b(int id) { return new B(id); } private static C c(int id) { return new C(id); } /** * Main method. */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); B[] bs = IntStream.range(0, 10).mapToObj(Test::b).toArray(B[]::new); C[] cs = IntStream.range(0, 10).mapToObj(Test::c).toArray(C[]::new); DataStreamSource<B> bStream = environment.fromElements(bs); DataStreamSource<C> cStream = environment.fromElements(cs); bStream.keyBy((KeySelector<B, Integer>) A::getId).print(); cStream.keyBy((KeySelector<C, Integer>) A::getId).print(); environment.execute(); } {code} This code throws next exception: {code}Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:901) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: Could not extract key from org.sample.flink.examples.Test$C@5e1a8111 at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:75) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:746) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:724) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:84) at org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:127) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:75) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Could not extract key from org.sample.flink.examples.Test$C@5e1a8111 at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61) at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:72) ... 11 more Caused by: java.lang.ClassCastException: org.sample.flink.examples.Test$C cannot be cast to org.sample.flink.examples.Test$B at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:59) ... 15 more{code} > ClassCastException when reusing an inherited method as KeySelector for > different classes > ---------------------------------------------------------------------------------------- > > Key: FLINK-5319 > URL: https://issues.apache.org/jira/browse/FLINK-5319 > Project: Flink > Issue Type: Bug > Components: Core > Affects Versions: 1.2.0 > Reporter: Alexander Chermenin > > Code sample: > {code}static abstract class A { > int id; > A(int id) {this.id = id; } > int getId() { return id; } > } > static class B extends A { B(int id) { super(id % 3); } } > static class C extends A { C(int id) { super(id % 2); } } > private static B b(int id) { return new B(id); } > private static C c(int id) { return new C(id); } > /** > * Main method. > */ > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment environment = > StreamExecutionEnvironment.getExecutionEnvironment(); > B[] bs = IntStream.range(0, 10).mapToObj(Test::b).toArray(B[]::new); > C[] cs = IntStream.range(0, 10).mapToObj(Test::c).toArray(C[]::new); > DataStreamSource<B> bStream = environment.fromElements(bs); > DataStreamSource<C> cStream = environment.fromElements(cs); > bStream.keyBy((KeySelector<B, Integer>) A::getId).print(); > cStream.keyBy((KeySelector<C, Integer>) A::getId).print(); > environment.execute(); > } > {code} > This code throws next exception: > {code}Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:901) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: Could not extract key from > org.sample.flink.examples.Test$C@5e1a8111 > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:75) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:746) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:724) > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:84) > at > org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:127) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:75) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Could not extract key from > org.sample.flink.examples.Test$C@5e1a8111 > at > org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61) > at > org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:72) > ... 11 more > Caused by: java.lang.ClassCastException: org.sample.flink.examples.Test$C > cannot be cast to org.sample.flink.examples.Test$B > at > org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:59) > ... 15 more{code} > This problem occurs when we use method reference as KeySelector. And there > are no problems when we use anonymous class or lambda. -- This message was sent by Atlassian JIRA (v6.3.4#6332)