Repository: flink Updated Branches: refs/heads/master ca82b0cc3 -> 40be172e4
http://git-wip-us.apache.org/repos/asf/flink/blob/8c46155a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java index 2cd3558..0a712e0 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java @@ -26,9 +26,9 @@ import java.util.List; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.datastream.StreamProjection; -import org.apache.flink.streaming.api.operators.StreamProject; import org.apache.flink.streaming.util.MockContext; import org.junit.Test; @@ -43,15 +43,17 @@ public class ProjectTest implements Serializable { 4)); int[] fields = new int[] { 4, 4, 3 }; - Class<?>[] classes = new Class<?>[] { Integer.class, Integer.class, String.class }; + // Class<?>[] classes = new Class<?>[] { Integer.class, Integer.class, String.class }; @SuppressWarnings("unchecked") - StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> operator = new StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>( + StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> operator = + new StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>( fields, - (TypeInformation<Tuple3<Integer, Integer, String>>) StreamProjection - .extractFieldTypes(fields, classes, inType)); + new TupleTypeInfo<Tuple3<Integer, Integer, String>>(StreamProjection + .extractFieldTypes(fields, inType))); - List<Tuple5<Integer, String, Integer, String, Integer>> input = new ArrayList<Tuple5<Integer, String, Integer, String, Integer>>(); + List<Tuple5<Integer, String, Integer, String, Integer>> input = new ArrayList<Tuple5<Integer, String, Integer, + String, Integer>>(); input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4)); input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "s", 3, "c", 2)); input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "c", 2)); http://git-wip-us.apache.org/repos/asf/flink/blob/8c46155a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectWithoutClassTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectWithoutClassTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectWithoutClassTest.java new file mode 100644 index 0000000..f924cfb --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectWithoutClassTest.java @@ -0,0 +1,22 @@ +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +public class ProjectWithoutClassTest { + + public static void main(String[] args) throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + env.generateSequence(1, 100).map(new MapFunction<Long, Tuple3<Long, Character, Double>>() { + + @Override + public Tuple3<Long, Character, Double> map(Long value) throws Exception { + return new Tuple3<Long, Character, Double>(value, 'c', (double) value); + } + }).project(0,2).print(); + + env.execute("ProjectWithoutClassTest"); + } +}