[ https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377708#comment-14377708 ]
ASF GitHub Bot commented on FLINK-1512: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/426#discussion_r27018849 --- Diff: flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java --- @@ -98,98 +123,66 @@ public void setFields(int[] sourceFieldIndices, Class<?>[] fieldTypes) { setFieldsGeneric(sourceFieldIndices, fieldTypes); } - public byte[] getCommentPrefix() { - return commentPrefix; - } - - public void setCommentPrefix(byte[] commentPrefix) { - this.commentPrefix = commentPrefix; - } - - public void setCommentPrefix(char commentPrefix) { - setCommentPrefix(String.valueOf(commentPrefix)); - } + public void setFields(boolean[] sourceFieldMask, Class<?>[] fieldTypes) { + Preconditions.checkNotNull(sourceFieldMask); + Preconditions.checkNotNull(fieldTypes); - public void setCommentPrefix(String commentPrefix) { - setCommentPrefix(commentPrefix, Charsets.UTF_8); + setFieldsGeneric(sourceFieldMask, fieldTypes); } - public void setCommentPrefix(String commentPrefix, String charsetName) throws IllegalCharsetNameException, UnsupportedCharsetException { - if (charsetName == null) { - throw new IllegalArgumentException("Charset name must not be null"); - } - - if (commentPrefix != null) { - Charset charset = Charset.forName(charsetName); - setCommentPrefix(commentPrefix, charset); - } else { - this.commentPrefix = null; - } + public Class<?>[] getFieldTypes() { + return super.getGenericFieldTypes(); } - public void setCommentPrefix(String commentPrefix, Charset charset) { - if (charset == null) { - throw new IllegalArgumentException("Charset must not be null"); - } - if (commentPrefix != null) { - this.commentPrefix = commentPrefix.getBytes(charset); - } else { - this.commentPrefix = null; - } - } - - @Override - public void close() throws IOException { - if (this.invalidLineCount > 0) { - if (LOG.isWarnEnabled()) { - LOG.warn("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.invalidLineCount +" invalid line(s) were skipped."); - } - } - - if (this.commentCount > 0) { - if (LOG.isInfoEnabled()) { - LOG.info("In file \""+ this.filePath + "\" (split start: " + this.splitStart + ") " + this.commentCount +" comment line(s) were skipped."); - } - } - super.close(); - } - - @Override - public OUT nextRecord(OUT record) throws IOException { - OUT returnRecord = null; - do { - returnRecord = super.nextRecord(record); - } while (returnRecord == null && !reachedEnd()); - - return returnRecord; - } - @Override public void open(FileInputSplit split) throws IOException { super.open(split); - + @SuppressWarnings("unchecked") FieldParser<Object>[] fieldParsers = (FieldParser<Object>[]) getFieldParsers(); - + //throw exception if no field parsers are available if (fieldParsers.length == 0) { throw new IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to parse input"); } - + // create the value holders this.parsedValues = new Object[fieldParsers.length]; for (int i = 0; i < fieldParsers.length; i++) { this.parsedValues[i] = fieldParsers[i].createValue(); } - this.commentCount = 0; - this.invalidLineCount = 0; - // left to right evaluation makes access [0] okay // this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default if (this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) { this.lineDelimiterIsLinebreak = true; } + + // for POJO type + if (pojoTypeClass != null) { + pojoFields = new Field[pojoFieldsName.length]; + try { + for (int i = 0; i < pojoFieldsName.length; i++) { + pojoFields[i] = pojoTypeClass.getDeclaredField(pojoFieldsName[i]); + pojoFields[i].setAccessible(true); + } + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); --- End diff -- Add message here as well. > Add CsvReader for reading into POJOs. > ------------------------------------- > > Key: FLINK-1512 > URL: https://issues.apache.org/jira/browse/FLINK-1512 > Project: Flink > Issue Type: New Feature > Components: Java API, Scala API > Reporter: Robert Metzger > Assignee: Chiwan Park > Priority: Minor > Labels: starter > > Currently, the {{CsvReader}} supports only TupleXX types. > It would be nice if users were also able to read into POJOs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)