Repository: flink Updated Branches: refs/heads/master f315c5700 -> db85f3858
[FLINK-3444] [APIs] Add fromElements method with based class type to avoid the exception. This closes #1857 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bb085ec Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bb085ec Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bb085ec Branch: refs/heads/master Commit: 6bb085ec6d70e196b7b61bec5f6dc3f924ca7906 Parents: 693d5ab Author: gallenvara <gallenv...@126.com> Authored: Wed Apr 6 16:04:32 2016 +0800 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Apr 13 01:10:54 2016 +0200 ---------------------------------------------------------------------- .../flink/api/java/ExecutionEnvironment.java | 45 ++++++++++++++++- .../flink/api/java/io/FromElementsTest.java | 51 ++++++++++++++++++++ .../environment/StreamExecutionEnvironment.java | 33 +++++++++++++ .../api/StreamExecutionEnvironmentTest.java | 27 +++++++++++ 4 files changed, 155 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6bb085ec/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 1363e26..89c817d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -777,7 +777,50 @@ public abstract class ExecutionEnvironment { throw new IllegalArgumentException("The number of elements must not be zero."); } - return fromCollection(Arrays.asList(data), TypeExtractor.getForObject(data[0]), Utils.getCallLocationName()); + TypeInformation<X> typeInfo; + try { + typeInfo = TypeExtractor.getForObject(data[0]); + } + catch (Exception e) { + throw new RuntimeException("Could not create TypeInformation for type " + data[0].getClass().getName() + + "; please specify the TypeInformation manually via " + + "ExecutionEnvironment#fromElements(Collection, TypeInformation)"); + } + + return fromCollection(Arrays.asList(data), typeInfo, Utils.getCallLocationName()); + } + + /** + * Creates a new data set that contains the given elements. The framework will determine the type according to the + * based type user supplied. The elements should be the same or be the subclass to the based type. + * The sequence of elements must not be empty. + * Note that this operation will result in a non-parallel data source, i.e. a data source with + * a parallelism of one. + * + * @param type The base class type for every element in the collection. + * @param data The elements to make up the data set. + * @return A DataSet representing the given list of elements. + */ + @SafeVarargs + public final <X> DataSource<X> fromElements(Class<X> type, X... data) { + if (data == null) { + throw new IllegalArgumentException("The data must not be null."); + } + if (data.length == 0) { + throw new IllegalArgumentException("The number of elements must not be zero."); + } + + TypeInformation<X> typeInfo; + try { + typeInfo = TypeExtractor.getForClass(type); + } + catch (Exception e) { + throw new RuntimeException("Could not create TypeInformation for type " + type.getName() + + "; please specify the TypeInformation manually via " + + "ExecutionEnvironment#fromElements(Collection, TypeInformation)"); + } + + return fromCollection(Arrays.asList(data), typeInfo, Utils.getCallLocationName()); } http://git-wip-us.apache.org/repos/asf/flink/blob/6bb085ec/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java new file mode 100644 index 0000000..2f403aa --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.junit.Test; + +public class FromElementsTest { + + @Test + public void fromElementsWithBaseTypeTest1() { + ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment(); + executionEnvironment.fromElements(ParentType.class, new SubType(1, "Java"), new ParentType(1, "hello")); + } + + @Test(expected = IllegalArgumentException.class) + public void fromElementsWithBaseTypeTest2() { + ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment(); + executionEnvironment.fromElements(SubType.class, new SubType(1, "Java"), new ParentType(1, "hello")); + } + + public static class ParentType { + int num; + String string; + public ParentType(int num, String string) { + this.num = num; + this.string = string; + } + } + + public static class SubType extends ParentType{ + public SubType(int num, String string) { + super(num, string); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6bb085ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index fb7ec9f..ae4758f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -673,6 +673,39 @@ public abstract class StreamExecutionEnvironment { } /** + * Creates a new data set that contains the given elements. The framework will determine the type according to the + * based type user supplied. The elements should be the same or be the subclass to the based type. + * The sequence of elements must not be empty. + * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a + * degree of parallelism one. + * + * @param type + * The based class type in the collection. + * @param data + * The array of elements to create the data stream from. + * @param <OUT> + * The type of the returned data stream + * @return The data stream representing the given array of elements + */ + @SafeVarargs + public final <OUT> DataStreamSource<OUT> fromElements(Class<OUT> type, OUT... data) { + if (data.length == 0) { + throw new IllegalArgumentException("fromElements needs at least one element as argument"); + } + + TypeInformation<OUT> typeInfo; + try { + typeInfo = TypeExtractor.getForClass(type); + } + catch (Exception e) { + throw new RuntimeException("Could not create TypeInformation for type " + type.getName() + + "; please specify the TypeInformation manually via " + + "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)"); + } + return fromCollection(Arrays.asList(data), typeInfo); + } + + /** * Creates a data stream from the given non-empty collection. The type of the data stream is that of the * elements in the collection. * http://git-wip-us.apache.org/repos/asf/flink/blob/6bb085ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java index 67a4b05..5e596b9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java @@ -45,6 +45,18 @@ import org.junit.Test; public class StreamExecutionEnvironmentTest extends StreamingMultipleProgramsTestBase { @Test + public void fromElementsWithBaseTypeTest1() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.fromElements(ParentClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello")); + } + + @Test(expected = IllegalArgumentException.class) + public void fromElementsWithBaseTypeTest2() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.fromElements(SubClass.class, new SubClass(1, "Java"), new ParentClass(1, "hello")); + } + + @Test @SuppressWarnings("unchecked") public void testFromCollectionParallelism() { try { @@ -159,4 +171,19 @@ public class StreamExecutionEnvironmentTest extends StreamingMultipleProgramsTes throw new UnsupportedOperationException(); } } + + public static class ParentClass { + int num; + String string; + public ParentClass(int num, String string) { + this.num = num; + this.string = string; + } + } + + public static class SubClass extends ParentClass{ + public SubClass(int num, String string) { + super(num, string); + } + } }