[ https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14356930#comment-14356930 ]
ASF GitHub Bot commented on FLINK-1512: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r26215940 --- Diff: flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java --- @@ -684,4 +693,178 @@ private void testRemovingTrailingCR(String lineBreakerInFile, String lineBreaker } } + @Test + public void testPojoType() throws Exception { + File tempFile = File.createTempFile("CsvReaderPojoType", "tmp"); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); + wrt.write("123,AAA,3.123,BBB\n"); + wrt.write("456,BBB,1.123,AAA\n"); + wrt.close(); + + @SuppressWarnings("unchecked") + TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class); + CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo); + + Configuration parameters = new Configuration(); + inputFormat.configure(parameters); + + inputFormat.setDelimiter('\n'); + inputFormat.setFieldDelimiter(','); + + FileInputSplit[] splits = inputFormat.createInputSplits(1); + + inputFormat.open(splits[0]); + + PojoItem item = new PojoItem(); + inputFormat.nextRecord(item); + + assertEquals(123, item.field1); + assertEquals("AAA", item.field2); + assertEquals(Double.valueOf(3.123), item.field3); + assertEquals("BBB", item.field4); + + inputFormat.nextRecord(item); + + assertEquals(456, item.field1); + assertEquals("BBB", item.field2); + assertEquals(Double.valueOf(1.123), item.field3); + assertEquals("AAA", item.field4); + } + + @Test + public void testPojoTypeWithPrivateField() throws Exception { + File tempFile = File.createTempFile("CsvReaderPojoType", "tmp"); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); + wrt.write("123,AAA,3.123,BBB\n"); + wrt.write("456,BBB,1.123,AAA\n"); + wrt.close(); + + @SuppressWarnings("unchecked") + TypeInformation<PrivatePojoItem> typeInfo = (TypeInformation<PrivatePojoItem>) TypeExtractor.createTypeInfo(PrivatePojoItem.class); + CsvInputFormat<PrivatePojoItem> inputFormat = new CsvInputFormat<PrivatePojoItem>(new Path(tempFile.toURI().toString()), typeInfo); + + Configuration parameters = new Configuration(); + inputFormat.configure(parameters); + + inputFormat.setDelimiter('\n'); + inputFormat.setFieldDelimiter(','); + + FileInputSplit[] splits = inputFormat.createInputSplits(1); + + inputFormat.open(splits[0]); + + PrivatePojoItem item = new PrivatePojoItem(); + inputFormat.nextRecord(item); + + assertEquals(123, item.field1); + assertEquals("AAA", item.field2); + assertEquals(Double.valueOf(3.123), item.field3); + assertEquals("BBB", item.field4); + + inputFormat.nextRecord(item); + + assertEquals(456, item.field1); + assertEquals("BBB", item.field2); + assertEquals(Double.valueOf(1.123), item.field3); + assertEquals("AAA", item.field4); + } + + @Test + public void testPojoTypeWithMappingInformation() throws Exception { + File tempFile = File.createTempFile("CsvReaderPojoType", "tmp"); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); + wrt.write("123,3.123,AAA,BBB\n"); + wrt.write("456,1.123,BBB,AAA\n"); + wrt.close(); + + @SuppressWarnings("unchecked") + TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class); + CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo); + inputFormat.setFieldsMap(new String[]{"field1", "field3", "field2", "field4"}); + + Configuration parameters = new Configuration(); + inputFormat.configure(parameters); + + inputFormat.setDelimiter('\n'); + inputFormat.setFieldDelimiter(','); + + FileInputSplit[] splits = inputFormat.createInputSplits(1); + + inputFormat.open(splits[0]); + + PojoItem item = new PojoItem(); + inputFormat.nextRecord(item); + + assertEquals(123, item.field1); + assertEquals("AAA", item.field2); + assertEquals(Double.valueOf(3.123), item.field3); + assertEquals("BBB", item.field4); + + inputFormat.nextRecord(item); + + assertEquals(456, item.field1); + assertEquals("BBB", item.field2); + assertEquals(Double.valueOf(1.123), item.field3); + assertEquals("AAA", item.field4); + } + --- End diff -- Can you add a few tests where the CSV file has more fields than the POJO and - fill all fields of the POJO with some fields of the file (incl. first and last) - fill a subset of the POJO fields with some fields of the file Add tests for correct error messages: - number of POJO fields != number of selected CSV fields - selected POJO field does not exist - POJO field type is not a Java Primitive (+String) which the format cannot parse (check available FieldParsers) > Add CsvReader for reading into POJOs. > ------------------------------------- > > Key: FLINK-1512 > URL: https://issues.apache.org/jira/browse/FLINK-1512 > Project: Flink > Issue Type: New Feature > Components: Java API, Scala API > Reporter: Robert Metzger > Assignee: Chiwan Park > Priority: Minor > Labels: starter > > Currently, the {{CsvReader}} supports only TupleXX types. > It would be nice if users were also able to read into POJOs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)