Repository: flink
Updated Branches:
  refs/heads/master f315c5700 -> db85f3858


[FLINK-3444] [APIs] Add fromElements method with based class type to avoid the 
exception.

This closes #1857


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bb085ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bb085ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bb085ec

Branch: refs/heads/master
Commit: 6bb085ec6d70e196b7b61bec5f6dc3f924ca7906
Parents: 693d5ab
Author: gallenvara <gallenv...@126.com>
Authored: Wed Apr 6 16:04:32 2016 +0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Apr 13 01:10:54 2016 +0200

----------------------------------------------------------------------
 .../flink/api/java/ExecutionEnvironment.java    | 45 ++++++++++++++++-
 .../flink/api/java/io/FromElementsTest.java     | 51 ++++++++++++++++++++
 .../environment/StreamExecutionEnvironment.java | 33 +++++++++++++
 .../api/StreamExecutionEnvironmentTest.java     | 27 +++++++++++
 4 files changed, 155 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6bb085ec/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 1363e26..89c817d 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -777,7 +777,50 @@ public abstract class ExecutionEnvironment {
                        throw new IllegalArgumentException("The number of 
elements must not be zero.");
                }
                
-               return fromCollection(Arrays.asList(data), 
TypeExtractor.getForObject(data[0]), Utils.getCallLocationName());
+               TypeInformation<X> typeInfo;
+               try {
+                       typeInfo = TypeExtractor.getForObject(data[0]);
+               }
+               catch (Exception e) {
+                       throw new RuntimeException("Could not create 
TypeInformation for type " + data[0].getClass().getName()
+                                       + "; please specify the TypeInformation 
manually via "
+                                       + 
"ExecutionEnvironment#fromElements(Collection, TypeInformation)");
+               }
+
+               return fromCollection(Arrays.asList(data), typeInfo, 
Utils.getCallLocationName());
+       }
+       
+       /**
+        * Creates a new data set that contains the given elements. The 
framework will determine the type according to the 
+        * based type user supplied. The elements should be the same or be the 
subclass to the based type. 
+        * The sequence of elements must not be empty.
+        * Note that this operation will result in a non-parallel data source, 
i.e. a data source with
+        * a parallelism of one.
+        *
+        * @param type The base class type for every element in the collection.
+        * @param data The elements to make up the data set.
+        * @return A DataSet representing the given list of elements.
+        */
+       @SafeVarargs
+       public final <X> DataSource<X> fromElements(Class<X> type, X... data) {
+               if (data == null) {
+                       throw new IllegalArgumentException("The data must not 
be null.");
+               }
+               if (data.length == 0) {
+                       throw new IllegalArgumentException("The number of 
elements must not be zero.");
+               }
+               
+               TypeInformation<X> typeInfo;
+               try {
+                       typeInfo = TypeExtractor.getForClass(type);
+               }
+               catch (Exception e) {
+                       throw new RuntimeException("Could not create 
TypeInformation for type " + type.getName()
+                                       + "; please specify the TypeInformation 
manually via "
+                                       + 
"ExecutionEnvironment#fromElements(Collection, TypeInformation)");
+               }
+
+               return fromCollection(Arrays.asList(data), typeInfo, 
Utils.getCallLocationName());
        }
        
        

http://git-wip-us.apache.org/repos/asf/flink/blob/6bb085ec/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java
new file mode 100644
index 0000000..2f403aa
--- /dev/null
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.junit.Test;
+
+public class FromElementsTest {
+
+       @Test
+       public void fromElementsWithBaseTypeTest1() {
+               ExecutionEnvironment executionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment();
+               executionEnvironment.fromElements(ParentType.class, new 
SubType(1, "Java"), new ParentType(1, "hello"));
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void fromElementsWithBaseTypeTest2() {
+               ExecutionEnvironment executionEnvironment = 
ExecutionEnvironment.getExecutionEnvironment();
+               executionEnvironment.fromElements(SubType.class, new SubType(1, 
"Java"), new ParentType(1, "hello"));
+       }
+
+       public static class ParentType {
+               int num;
+               String string;
+               public ParentType(int num, String string) {
+                       this.num = num;
+                       this.string = string;
+               }
+       }
+
+       public static class SubType extends ParentType{
+               public SubType(int num, String string) {
+                       super(num, string);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6bb085ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index fb7ec9f..ae4758f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -673,6 +673,39 @@ public abstract class StreamExecutionEnvironment {
        }
 
        /**
+        * Creates a new data set that contains the given elements. The 
framework will determine the type according to the 
+        * based type user supplied. The elements should be the same or be the 
subclass to the based type. 
+        * The sequence of elements must not be empty.
+        * Note that this operation will result in a non-parallel data stream 
source, i.e. a data stream source with a
+        * degree of parallelism one.
+        *
+        * @param type
+        *              The based class type in the collection.
+        * @param data
+        *              The array of elements to create the data stream from.
+        * @param <OUT>
+        *              The type of the returned data stream
+        * @return The data stream representing the given array of elements
+        */
+       @SafeVarargs
+       public final <OUT> DataStreamSource<OUT> fromElements(Class<OUT> type, 
OUT... data) {
+               if (data.length == 0) {
+                       throw new IllegalArgumentException("fromElements needs 
at least one element as argument");
+               }
+
+               TypeInformation<OUT> typeInfo;
+               try {
+                       typeInfo = TypeExtractor.getForClass(type);
+               }
+               catch (Exception e) {
+                       throw new RuntimeException("Could not create 
TypeInformation for type " + type.getName()
+                                       + "; please specify the TypeInformation 
manually via "
+                                       + 
"StreamExecutionEnvironment#fromElements(Collection, TypeInformation)");
+               }
+               return fromCollection(Arrays.asList(data), typeInfo);
+       }
+
+       /**
         * Creates a data stream from the given non-empty collection. The type 
of the data stream is that of the
         * elements in the collection.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/6bb085ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index 67a4b05..5e596b9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -45,6 +45,18 @@ import org.junit.Test;
 public class StreamExecutionEnvironmentTest extends 
StreamingMultipleProgramsTestBase {
 
        @Test
+       public void fromElementsWithBaseTypeTest1() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.fromElements(ParentClass.class, new SubClass(1, "Java"), 
new ParentClass(1, "hello"));
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void fromElementsWithBaseTypeTest2() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.fromElements(SubClass.class, new SubClass(1, "Java"), new 
ParentClass(1, "hello"));
+       }
+
+       @Test
        @SuppressWarnings("unchecked")
        public void testFromCollectionParallelism() {
                try {
@@ -159,4 +171,19 @@ public class StreamExecutionEnvironmentTest extends 
StreamingMultipleProgramsTes
                        throw new UnsupportedOperationException();
                }
        }
+
+       public static class ParentClass {
+               int num;
+               String string;
+               public ParentClass(int num, String string) {
+                       this.num = num;
+                       this.string = string;
+               }
+       }
+
+       public static class SubClass extends ParentClass{
+               public SubClass(int num, String string) {
+                       super(num, string);
+               }
+       }
 }

Reply via email to