[ https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14376943#comment-14376943 ]
ASF GitHub Bot commented on FLINK-1512: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r26993011 --- Diff: flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java --- @@ -684,4 +693,249 @@ private void testRemovingTrailingCR(String lineBreakerInFile, String lineBreaker } } + private void validatePojoItem(CsvInputFormat<PojoItem> format) throws Exception { + PojoItem item = new PojoItem(); + + format.nextRecord(item); + + assertEquals(123, item.field1); + assertEquals("AAA", item.field2); + assertEquals(Double.valueOf(3.123), item.field3); + assertEquals("BBB", item.field4); + + format.nextRecord(item); + + assertEquals(456, item.field1); + assertEquals("BBB", item.field2); + assertEquals(Double.valueOf(1.123), item.field3); + assertEquals("AAA", item.field4); + } + + @Test + public void testPojoType() throws Exception { + File tempFile = File.createTempFile("CsvReaderPojoType", "tmp"); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); + wrt.write("123,AAA,3.123,BBB\n"); + wrt.write("456,BBB,1.123,AAA\n"); + wrt.close(); + + @SuppressWarnings("unchecked") + TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class); + CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo); + + inputFormat.configure(new Configuration()); + FileInputSplit[] splits = inputFormat.createInputSplits(1); + + inputFormat.open(splits[0]); + + validatePojoItem(inputFormat); + } + + @Test + public void testPojoTypeWithPrivateField() throws Exception { + File tempFile = File.createTempFile("CsvReaderPojoType", "tmp"); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); + wrt.write("123,AAA,3.123,BBB\n"); + wrt.write("456,BBB,1.123,AAA\n"); + wrt.close(); + + @SuppressWarnings("unchecked") + TypeInformation<PrivatePojoItem> typeInfo = (TypeInformation<PrivatePojoItem>) TypeExtractor.createTypeInfo(PrivatePojoItem.class); + CsvInputFormat<PrivatePojoItem> inputFormat = new CsvInputFormat<PrivatePojoItem>(new Path(tempFile.toURI().toString()), typeInfo); + + inputFormat.configure(new Configuration()); + + FileInputSplit[] splits = inputFormat.createInputSplits(1); + inputFormat.open(splits[0]); + + PrivatePojoItem item = new PrivatePojoItem(); + inputFormat.nextRecord(item); + + assertEquals(123, item.field1); + assertEquals("AAA", item.field2); + assertEquals(Double.valueOf(3.123), item.field3); + assertEquals("BBB", item.field4); + + inputFormat.nextRecord(item); + + assertEquals(456, item.field1); + assertEquals("BBB", item.field2); + assertEquals(Double.valueOf(1.123), item.field3); + assertEquals("AAA", item.field4); + } + + @Test + public void testPojoTypeWithMappingInformation() throws Exception { + File tempFile = File.createTempFile("CsvReaderPojoType", "tmp"); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); + wrt.write("123,3.123,AAA,BBB\n"); + wrt.write("456,1.123,BBB,AAA\n"); + wrt.close(); + + @SuppressWarnings("unchecked") + TypeInformation<PojoItem> typeInfo = (TypeInformation<PojoItem>) TypeExtractor.createTypeInfo(PojoItem.class); + CsvInputFormat<PojoItem> inputFormat = new CsvInputFormat<PojoItem>(new Path(tempFile.toURI().toString()), typeInfo); + inputFormat.setFieldsOrder(new String[]{"field1", "field3", "field2", "field4"}); + + inputFormat.configure(new Configuration()); + FileInputSplit[] splits = inputFormat.createInputSplits(1); + + inputFormat.open(splits[0]); + + validatePojoItem(inputFormat); + } + + @Test + public void testPojoTypeWithPartialFieldInCSV() throws Exception { + File tempFile = File.createTempFile("CsvReaderPojoType", "tmp"); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + OutputStreamWriter wrt = new OutputStreamWriter(new FileOutputStream(tempFile)); + wrt.write("1.123,BBB,456,AAA\n"); --- End diff -- Instead of reading into a smaller POJO, you could also add more than four fields to the test input file. Then you don't need the SmallPojoItem and can reuse the validation method. > Add CsvReader for reading into POJOs. > ------------------------------------- > > Key: FLINK-1512 > URL: https://issues.apache.org/jira/browse/FLINK-1512 > Project: Flink > Issue Type: New Feature > Components: Java API, Scala API > Reporter: Robert Metzger > Assignee: Chiwan Park > Priority: Minor > Labels: starter > > Currently, the {{CsvReader}} supports only TupleXX types. > It would be nice if users were also able to read into POJOs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)