dawidwys commented on a change in pull request #15507:
URL: https://github.com/apache/flink/pull/15507#discussion_r608676177



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
##########
@@ -68,6 +88,135 @@ public void testStrings() {
         }
     }
 
+    @Test
+    public void testNullElement() throws Exception {
+        try {
+            new FromElementsFunction<>("a", null, "b");
+            fail("expect exception");
+        } catch (Exception ex) {
+            assertThat(ex, instanceOf(IllegalArgumentException.class));
+            assertThat(ex.getMessage(), containsString("contains a null 
element"));
+        }
+    }
+
+    @Test
+    public void testSetOutputTypeWithNoSerializer() throws Exception {
+        FromElementsFunction<String> source = new 
FromElementsFunction<>(STRING_ARRAY_DATA);
+
+        assertNull(source.getSerializer());
+
+        source.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new 
ExecutionConfig());
+
+        assertNotNull(source.getSerializer());
+        assertEquals(
+                BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
+                source.getSerializer());
+
+        List<String> result = runSource(source);
+
+        assertEquals(STRING_LIST_DATA, result);
+    }
+
+    @Test
+    public void testSetOutputTypeWithSameSerializer() throws Exception {
+        FromElementsFunction<String> source =
+                new FromElementsFunction<>(
+                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
+                        STRING_LIST_DATA);
+
+        TypeSerializer<String> existingSerializer = source.getSerializer();
+
+        source.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new 
ExecutionConfig());
+
+        TypeSerializer<String> newSerializer = source.getSerializer();
+
+        assertEquals(existingSerializer, newSerializer);
+
+        List<String> result = runSource(source);
+
+        assertEquals(STRING_LIST_DATA, result);
+    }
+
+    @Test
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public void testSetOutputTypeWithIncompatibleType() throws Exception {
+        FromElementsFunction<String> source = new 
FromElementsFunction<>(STRING_LIST_DATA);
+
+        try {
+            source.setOutputType(
+                    (TypeInformation) BasicTypeInfo.INT_TYPE_INFO, new 
ExecutionConfig());
+            fail("expect exception");
+        } catch (Exception ex) {
+            assertThat(ex, instanceOf(IllegalArgumentException.class));
+            assertThat(ex.getMessage(), containsString("not all subclasses of 
java.lang.Integer"));
+        }
+    }
+
+    @Test
+    public void testSetOutputTypeWithDifferentSerializer() throws Exception {
+        FromElementsFunction<String> source =
+                new FromElementsFunction<>(
+                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
+                        STRING_LIST_DATA);
+
+        TypeSerializer<String> existingSerializer = source.getSerializer();
+
+        source.setOutputType(new GenericTypeInfo<>(String.class), new 
ExecutionConfig());
+
+        TypeSerializer<String> newSerializer = source.getSerializer();
+
+        assertNotEquals(existingSerializer, newSerializer);
+
+        List<String> result = runSource(source);
+
+        assertEquals(STRING_LIST_DATA, result);
+    }
+
+    @Test
+    public void testSetOutputTypeWithExistingBrokenSerializer() throws 
Exception {

Review comment:
       How is this test different from the one above? In the context of 
`setOutputType`?
   
   Could we have one test (instead of 
`testSetOutputTypeWithExistingBrokenSerializer` & 
`testSetOutputTypeWithDifferentSerializer`) that verifies that the updated 
serializer is used? E.g. a custom string serializer that adds additional 
characters, or something similar?

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java
##########
@@ -209,6 +211,25 @@ public void testVirtualTransformations() throws Exception {
                         instanceof ShufflePartitioner);
     }
 
+    @Test
+    public void testOutputTypeConfigurationWithUdfStreamOperator() throws 
Exception {

Review comment:
       How is this test related to the `fromElements` function?

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
##########
@@ -68,6 +88,135 @@ public void testStrings() {
         }
     }
 
+    @Test
+    public void testNullElement() throws Exception {
+        try {
+            new FromElementsFunction<>("a", null, "b");
+            fail("expect exception");
+        } catch (Exception ex) {
+            assertThat(ex, instanceOf(IllegalArgumentException.class));
+            assertThat(ex.getMessage(), containsString("contains a null 
element"));
+        }
+    }
+
+    @Test
+    public void testSetOutputTypeWithNoSerializer() throws Exception {
+        FromElementsFunction<String> source = new 
FromElementsFunction<>(STRING_ARRAY_DATA);
+
+        assertNull(source.getSerializer());
+
+        source.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new 
ExecutionConfig());
+
+        assertNotNull(source.getSerializer());
+        assertEquals(
+                BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
+                source.getSerializer());
+
+        List<String> result = runSource(source);
+
+        assertEquals(STRING_LIST_DATA, result);
+    }
+
+    @Test
+    public void testSetOutputTypeWithSameSerializer() throws Exception {
+        FromElementsFunction<String> source =
+                new FromElementsFunction<>(
+                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
+                        STRING_LIST_DATA);
+
+        TypeSerializer<String> existingSerializer = source.getSerializer();
+
+        source.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new 
ExecutionConfig());
+
+        TypeSerializer<String> newSerializer = source.getSerializer();
+
+        assertEquals(existingSerializer, newSerializer);
+
+        List<String> result = runSource(source);
+
+        assertEquals(STRING_LIST_DATA, result);
+    }
+
+    @Test
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public void testSetOutputTypeWithIncompatibleType() throws Exception {
+        FromElementsFunction<String> source = new 
FromElementsFunction<>(STRING_LIST_DATA);
+
+        try {
+            source.setOutputType(
+                    (TypeInformation) BasicTypeInfo.INT_TYPE_INFO, new 
ExecutionConfig());
+            fail("expect exception");
+        } catch (Exception ex) {
+            assertThat(ex, instanceOf(IllegalArgumentException.class));
+            assertThat(ex.getMessage(), containsString("not all subclasses of 
java.lang.Integer"));
+        }
+    }
+
+    @Test
+    public void testSetOutputTypeWithDifferentSerializer() throws Exception {
+        FromElementsFunction<String> source =
+                new FromElementsFunction<>(
+                        BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new 
ExecutionConfig()),
+                        STRING_LIST_DATA);
+
+        TypeSerializer<String> existingSerializer = source.getSerializer();
+
+        source.setOutputType(new GenericTypeInfo<>(String.class), new 
ExecutionConfig());
+
+        TypeSerializer<String> newSerializer = source.getSerializer();
+
+        assertNotEquals(existingSerializer, newSerializer);
+
+        List<String> result = runSource(source);
+
+        assertEquals(STRING_LIST_DATA, result);
+    }
+
+    @Test
+    public void testSetOutputTypeWithExistingBrokenSerializer() throws 
Exception {
+        TypeInformation<DeserializeTooMuchType> info =
+                new ValueTypeInfo<>(DeserializeTooMuchType.class);
+
+        FromElementsFunction<DeserializeTooMuchType> source =
+                new FromElementsFunction<>(
+                        info.createSerializer(new ExecutionConfig()), new 
DeserializeTooMuchType());
+
+        source.setOutputType(
+                new GenericTypeInfo<>(DeserializeTooMuchType.class), new 
ExecutionConfig());
+
+        List<DeserializeTooMuchType> result = runSource(source);
+
+        assertThat(result, hasSize(1));
+        assertThat(result.get(0), instanceOf(DeserializeTooMuchType.class));
+    }
+
+    @Test
+    public void testSetOutputTypeAfterTransferred() throws Exception {
+        try {
+            FromElementsFunction<String> source =
+                    InstantiationUtil.clone(new 
FromElementsFunction<>(STRING_LIST_DATA));
+
+            source.setOutputType(BasicTypeInfo.STRING_TYPE_INFO, new 
ExecutionConfig());
+            fail("expect exception");
+        } catch (Exception ex) {
+            assertThat(ex, instanceOf(IllegalStateException.class));
+            assertThat(ex.getMessage(), containsString("elements lost during 
serialization"));

Review comment:
       This message does not help much. Could we rather explain what is the 
root cause? Something like:
   
   `The output type should've been specified before shipping the graph to the 
cluster`? or sth similar?

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/FromElementsFunctionTest.java
##########
@@ -68,6 +88,135 @@ public void testStrings() {
         }
     }
 
+    @Test
+    public void testNullElement() throws Exception {
+        try {
+            new FromElementsFunction<>("a", null, "b");
+            fail("expect exception");
+        } catch (Exception ex) {
+            assertThat(ex, instanceOf(IllegalArgumentException.class));

Review comment:
       How about we the ExpectedException rule instead?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
##########
@@ -1096,12 +1096,7 @@ private StateBackend loadStateBackend(ReadableConfig 
configuration, ClassLoader
         // must not have null elements and mixed elements
         FromElementsFunction.checkCollection(data, typeInfo.getTypeClass());
 
-        SourceFunction<OUT> function;
-        try {
-            function = new 
FromElementsFunction<>(typeInfo.createSerializer(getConfig()), data);
-        } catch (IOException e) {
-            throw new RuntimeException(e.getMessage(), e);
-        }
+        SourceFunction<OUT> function = new FromElementsFunction<>(data);

Review comment:
       Could we add a single positive test for the `returns` method on 
`fromElements`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to