[ 
https://issues.apache.org/jira/browse/FLINK-5319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Chermenin updated FLINK-5319:
---------------------------------------
    Description: 
Code sample:
{code}static abstract class A {
    int id;
    A(int id) {this.id = id; }
    int getId() { return id; }
}

static class B extends A { B(int id) { super(id % 3); } }
static class C extends A { C(int id) { super(id % 2); } }

private static B b(int id) { return new B(id); }
private static C c(int id) { return new C(id); }

/**
 * Main method.
 */
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment environment =
        StreamExecutionEnvironment.getExecutionEnvironment();

    B[] bs = IntStream.range(0, 10).mapToObj(Test::b).toArray(B[]::new);
    C[] cs = IntStream.range(0, 10).mapToObj(Test::c).toArray(C[]::new);

    DataStreamSource<B> bStream = environment.fromElements(bs);
    DataStreamSource<C> cStream = environment.fromElements(cs);

    bStream.keyBy((KeySelector<B, Integer>) A::getId).print();
    cStream.keyBy((KeySelector<C, Integer>) A::getId).print();

    environment.execute();
}
{code}

This code throws next exception:
{code}Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:901)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844)
        at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Could not extract key from 
org.sample.flink.examples.Test$C@5e1a8111
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:75)
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:746)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:724)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:84)
        at 
org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:127)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:75)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not extract key from 
org.sample.flink.examples.Test$C@5e1a8111
        at 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)
        at 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
        at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:72)
        ... 11 more
Caused by: java.lang.ClassCastException: org.sample.flink.examples.Test$C 
cannot be cast to org.sample.flink.examples.Test$B
        at 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:59)
        ... 15 more{code}

This problem occurs when we use method reference as KeySelector. And there are 
no problems when we use anonymous class or lambda.

  was:
Code sample:
{code}static abstract class A {
    int id;
    A(int id) {this.id = id; }
    int getId() { return id; }
}

static class B extends A { B(int id) { super(id % 3); } }
static class C extends A { C(int id) { super(id % 2); } }

private static B b(int id) { return new B(id); }
private static C c(int id) { return new C(id); }

/**
 * Main method.
 */
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment environment =
        StreamExecutionEnvironment.getExecutionEnvironment();

    B[] bs = IntStream.range(0, 10).mapToObj(Test::b).toArray(B[]::new);
    C[] cs = IntStream.range(0, 10).mapToObj(Test::c).toArray(C[]::new);

    DataStreamSource<B> bStream = environment.fromElements(bs);
    DataStreamSource<C> cStream = environment.fromElements(cs);

    bStream.keyBy((KeySelector<B, Integer>) A::getId).print();
    cStream.keyBy((KeySelector<C, Integer>) A::getId).print();

    environment.execute();
}
{code}

This code throws next exception:
{code}Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:901)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844)
        at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Could not extract key from 
org.sample.flink.examples.Test$C@5e1a8111
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:75)
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:746)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:724)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:84)
        at 
org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:127)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:75)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not extract key from 
org.sample.flink.examples.Test$C@5e1a8111
        at 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)
        at 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
        at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:72)
        ... 11 more
Caused by: java.lang.ClassCastException: org.sample.flink.examples.Test$C 
cannot be cast to org.sample.flink.examples.Test$B
        at 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:59)
        ... 15 more{code}


> ClassCastException when reusing an inherited method as KeySelector for 
> different classes
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-5319
>                 URL: https://issues.apache.org/jira/browse/FLINK-5319
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.2.0
>            Reporter: Alexander Chermenin
>
> Code sample:
> {code}static abstract class A {
>     int id;
>     A(int id) {this.id = id; }
>     int getId() { return id; }
> }
> static class B extends A { B(int id) { super(id % 3); } }
> static class C extends A { C(int id) { super(id % 2); } }
> private static B b(int id) { return new B(id); }
> private static C c(int id) { return new C(id); }
> /**
>  * Main method.
>  */
> public static void main(String[] args) throws Exception {
>     StreamExecutionEnvironment environment =
>         StreamExecutionEnvironment.getExecutionEnvironment();
>     B[] bs = IntStream.range(0, 10).mapToObj(Test::b).toArray(B[]::new);
>     C[] cs = IntStream.range(0, 10).mapToObj(Test::c).toArray(C[]::new);
>     DataStreamSource<B> bStream = environment.fromElements(bs);
>     DataStreamSource<C> cStream = environment.fromElements(cs);
>     bStream.keyBy((KeySelector<B, Integer>) A::getId).print();
>     cStream.keyBy((KeySelector<C, Integer>) A::getId).print();
>     environment.execute();
> }
> {code}
> This code throws next exception:
> {code}Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:901)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:844)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Could not extract key from 
> org.sample.flink.examples.Test$C@5e1a8111
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:75)
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:746)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:724)
>       at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:84)
>       at 
> org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:127)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:75)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Could not extract key from 
> org.sample.flink.examples.Test$C@5e1a8111
>       at 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:61)
>       at 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
>       at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
>       at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:72)
>       ... 11 more
> Caused by: java.lang.ClassCastException: org.sample.flink.examples.Test$C 
> cannot be cast to org.sample.flink.examples.Test$B
>       at 
> org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:59)
>       ... 15 more{code}
> This problem occurs when we use method reference as KeySelector. And there 
> are no problems when we use anonymous class or lambda.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to