[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377708#comment-14377708
 ] 

ASF GitHub Bot commented on FLINK-1512:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/426#discussion_r27018849
  
    --- Diff: 
flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
 ---
    @@ -98,98 +123,66 @@ public void setFields(int[] sourceFieldIndices, 
Class<?>[] fieldTypes) {
                setFieldsGeneric(sourceFieldIndices, fieldTypes);
        }
     
    -   public byte[] getCommentPrefix() {
    -           return commentPrefix;
    -   }
    -
    -   public void setCommentPrefix(byte[] commentPrefix) {
    -           this.commentPrefix = commentPrefix;
    -   }
    -
    -   public void setCommentPrefix(char commentPrefix) {
    -           setCommentPrefix(String.valueOf(commentPrefix));
    -   }
    +   public void setFields(boolean[] sourceFieldMask, Class<?>[] fieldTypes) 
{
    +           Preconditions.checkNotNull(sourceFieldMask);
    +           Preconditions.checkNotNull(fieldTypes);
     
    -   public void setCommentPrefix(String commentPrefix) {
    -           setCommentPrefix(commentPrefix, Charsets.UTF_8);
    +           setFieldsGeneric(sourceFieldMask, fieldTypes);
        }
     
    -   public void setCommentPrefix(String commentPrefix, String charsetName) 
throws IllegalCharsetNameException, UnsupportedCharsetException {
    -           if (charsetName == null) {
    -                   throw new IllegalArgumentException("Charset name must 
not be null");
    -           }
    -
    -           if (commentPrefix != null) {
    -                   Charset charset = Charset.forName(charsetName);
    -                   setCommentPrefix(commentPrefix, charset);
    -           } else {
    -                   this.commentPrefix = null;
    -           }
    +   public Class<?>[] getFieldTypes() {
    +           return super.getGenericFieldTypes();
        }
     
    -   public void setCommentPrefix(String commentPrefix, Charset charset) {
    -           if (charset == null) {
    -                   throw new IllegalArgumentException("Charset must not be 
null");
    -           }
    -           if (commentPrefix != null) {
    -                   this.commentPrefix = commentPrefix.getBytes(charset);
    -           } else {
    -                   this.commentPrefix = null;
    -           }
    -   }
    -
    -   @Override
    -   public void close() throws IOException {
    -           if (this.invalidLineCount > 0) {
    -                   if (LOG.isWarnEnabled()) {
    -                           LOG.warn("In file \""+ this.filePath + "\" 
(split start: " + this.splitStart + ") " + this.invalidLineCount +" invalid 
line(s) were skipped.");
    -                   }
    -           }
    -
    -           if (this.commentCount > 0) {
    -                   if (LOG.isInfoEnabled()) {
    -                           LOG.info("In file \""+ this.filePath + "\" 
(split start: " + this.splitStart + ") " + this.commentCount +" comment line(s) 
were skipped.");
    -                   }
    -           }
    -           super.close();
    -   }
    -
    -   @Override
    -   public OUT nextRecord(OUT record) throws IOException {
    -           OUT returnRecord = null;
    -           do {
    -                   returnRecord = super.nextRecord(record);
    -           } while (returnRecord == null && !reachedEnd());
    -
    -           return returnRecord;
    -   }
    -   
        @Override
        public void open(FileInputSplit split) throws IOException {
                super.open(split);
    -           
    +
                @SuppressWarnings("unchecked")
                FieldParser<Object>[] fieldParsers = (FieldParser<Object>[]) 
getFieldParsers();
    -           
    +
                //throw exception if no field parsers are available
                if (fieldParsers.length == 0) {
                        throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
                }
    -           
    +
                // create the value holders
                this.parsedValues = new Object[fieldParsers.length];
                for (int i = 0; i < fieldParsers.length; i++) {
                        this.parsedValues[i] = fieldParsers[i].createValue();
                }
     
    -           this.commentCount = 0;
    -           this.invalidLineCount = 0;
    -           
                // left to right evaluation makes access [0] okay
                // this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
                if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
                        this.lineDelimiterIsLinebreak = true;
                }
    +
    +           // for POJO type
    +           if (pojoTypeClass != null) {
    +                   pojoFields = new Field[pojoFieldsName.length];
    +                   try {
    +                           for (int i = 0; i < pojoFieldsName.length; i++) 
{
    +                                   pojoFields[i] = 
pojoTypeClass.getDeclaredField(pojoFieldsName[i]);
    +                                   pojoFields[i].setAccessible(true);
    +                           }
    +                   } catch (NoSuchFieldException e) {
    +                           throw new RuntimeException(e);
    --- End diff --
    
    Add message here as well.


> Add CsvReader for reading into POJOs.
> -------------------------------------
>
>                 Key: FLINK-1512
>                 URL: https://issues.apache.org/jira/browse/FLINK-1512
>             Project: Flink
>          Issue Type: New Feature
>          Components: Java API, Scala API
>            Reporter: Robert Metzger
>            Assignee: Chiwan Park
>            Priority: Minor
>              Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to