Martin Junghanns created FLINK-4872:
---------------------------------------

             Summary: Type erasure problem exclusively on cluster execution
                 Key: FLINK-4872
                 URL: https://issues.apache.org/jira/browse/FLINK-4872
             Project: Flink
          Issue Type: Bug
          Components: Core
    Affects Versions: 1.1.2
            Reporter: Martin Junghanns


The following codes runs fine on local and collection execution environment but 
fails when executed on a cluster.

{code:title=Problem.java}
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;

import java.lang.reflect.Array;

public class Problem {

  public static class Pojo {
  }

  public static class Foo<T> extends Tuple1<T> {
  }

  public static class Bar<T> extends Tuple1<T[]> {
  }

  public static class UDF<T> implements MapFunction<Foo<T>, Bar<T>> {

    private final Class<T> clazz;

    public UDF(Class<T> clazz) {
      this.clazz = clazz;
    }

    @Override
    public Bar<T> map(Foo<T> value) throws Exception {
      Bar<T> bar = new Bar<>();
      //noinspection unchecked
      bar.f0 = (T[]) Array.newInstance(clazz, 10);
      return bar;
    }
  }

  public static void main(String[] args) throws Exception {
    // runs in local, collection and cluster execution
    withLong();
    // runs in local and collection execution, fails on cluster execution
    withPojo();
  }

  public static void withLong() throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    Foo<Long> foo = new Foo<>();
    foo.f0 = 42L;
    DataSet<Foo<Long>> barDataSource = env.fromElements(foo);
    DataSet<Bar<Long>> map = barDataSource.map(new UDF<>(Long.class));

    map.print();
  }

  public static void withPojo() throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    Foo<Pojo> foo = new Foo<>();
    foo.f0 = new Pojo();
    DataSet<Foo<Pojo>> barDataSource = env.fromElements(foo);
    DataSet<Bar<Pojo>> map = barDataSource.map(new UDF<>(Pojo.class));

    map.print();
  }
}
{code}

{code:title=ProblemTest.java}
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class ProblemTest extends MultipleProgramsTestBase {

  public ProblemTest(TestExecutionMode mode) {
    super(mode);
  }

  @Test
  public void testWithLong() throws Exception {
    Problem.withLong();
  }

  @Test
  public void testWithPOJO() throws Exception {
    Problem.withPojo();
  }
}
{code}

Exception:
{code}
The return type of function 'withPojo(Problem.java:58)' could not be determined 
automatically, due to type erasure. You can give type information hints by 
using the returns(...) method on the result of the transformation call, or by 
letting your function implement the 'ResultTypeQueryable' interface.
    org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
    org.apache.flink.api.java.DataSet.collect(DataSet.java:407)
    org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
    Problem.withPojo(Problem.java:60)
    Problem.main(Problem.java:38) 
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to