[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-25 Thread Chiwan Park (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14381156#comment-14381156
 ] 

Chiwan Park commented on FLINK-1512:


Thanks for merging it down :)

> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
> Fix For: 0.9
>
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14380612#comment-14380612
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/426


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14380037#comment-14380037
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-86079595
  
@chiwanpark excellent job, thanks!
Will merge it after a final round of Travis tests passed.




> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14379815#comment-14379815
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-86011782
  
@fhueske You can check it now :)


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14379813#comment-14379813
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-86011531
  
Sure, no problem :-) 
Can I check it now or do you need a bit more time?


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14379809#comment-14379809
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-86011099
  
Oops, I pushed a intermediate commit a8a5c37. I will fix it.


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14379805#comment-14379805
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-86010852
  
@chiwanpark Thanks, the code looks good to me!
Will try and merge it if everything works.


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14379359#comment-14379359
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-85866221
  
Cool, thanks. Will have a look shortly.

Did you rebase to the latest master? We had a few build issues with the
master branch a few days ago.

It is a good sign if it builds on your machine.
On Mar 25, 2015 5:47 AM, "Chiwan Park"  wrote:

> Hi, I updated this PR.
>
>- Remove pojoType(Class targetType) method in CsvReader to force
>the user to explicitly specify the fields order.
>- Add checking the fields order existence routine in readCsvFile
>method.
>- Add two integration tests for above 2 modifications.
>
> By the way, I cannot find out why Travis fails. In my computer, mvn clean
> install -DskipTests and mvn verify succeed. From travis log
> , It seems
> that the problem relates with Gelly. Although I read some codes in Gelly, 
I
> cannot find what is exactly problem.
>
> Could anyone help me with this problem?
>
> —
> Reply to this email directly or view it on GitHub
> .
>



> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14379285#comment-14379285
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-85830275
  
Hi, I updated this PR.

* Remove `pojoType(Class targetType)` method in `CsvReader` to force the 
user to explicitly specify the fields order.
* Add checking the fields order existence routine in `readCsvFile` method.
* Add two integration tests for above 2 modifications.

By the way, I cannot find out why Travis fails. In my computer, `mvn clean 
install -DskipTests` and `mvn verify` succeed. From [travis 
log](https://travis-ci.org/chiwanpark/flink/jobs/55747686#L7074), It seems that 
the problem relates with Gelly. Although I read some codes in Gelly, I cannot 
find what is exactly problem.

Could anyone help me with this problem?


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377716#comment-14377716
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-85455383
  
Very good! Let me know, when you want me to have a look again :-)


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377710#comment-14377710
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r27018880
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
 ---
@@ -223,8 +224,11 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
* @param lenient Whether the parser should silently ignore malformed 
lines.
* @param includedFields The fields in the file that should be read. Per 
default all fields
*   are read.
+   * @param fieldsOrder The order information between CSV data and POJO 
fields. Without order
--- End diff --

change parameter name to `pojoFieldOrder`?


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377708#comment-14377708
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r27018849
  
--- Diff: 
flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
 ---
@@ -98,98 +123,66 @@ public void setFields(int[] sourceFieldIndices, 
Class[] fieldTypes) {
setFieldsGeneric(sourceFieldIndices, fieldTypes);
}
 
-   public byte[] getCommentPrefix() {
-   return commentPrefix;
-   }
-
-   public void setCommentPrefix(byte[] commentPrefix) {
-   this.commentPrefix = commentPrefix;
-   }
-
-   public void setCommentPrefix(char commentPrefix) {
-   setCommentPrefix(String.valueOf(commentPrefix));
-   }
+   public void setFields(boolean[] sourceFieldMask, Class[] fieldTypes) 
{
+   Preconditions.checkNotNull(sourceFieldMask);
+   Preconditions.checkNotNull(fieldTypes);
 
-   public void setCommentPrefix(String commentPrefix) {
-   setCommentPrefix(commentPrefix, Charsets.UTF_8);
+   setFieldsGeneric(sourceFieldMask, fieldTypes);
}
 
-   public void setCommentPrefix(String commentPrefix, String charsetName) 
throws IllegalCharsetNameException, UnsupportedCharsetException {
-   if (charsetName == null) {
-   throw new IllegalArgumentException("Charset name must 
not be null");
-   }
-
-   if (commentPrefix != null) {
-   Charset charset = Charset.forName(charsetName);
-   setCommentPrefix(commentPrefix, charset);
-   } else {
-   this.commentPrefix = null;
-   }
+   public Class[] getFieldTypes() {
+   return super.getGenericFieldTypes();
}
 
-   public void setCommentPrefix(String commentPrefix, Charset charset) {
-   if (charset == null) {
-   throw new IllegalArgumentException("Charset must not be 
null");
-   }
-   if (commentPrefix != null) {
-   this.commentPrefix = commentPrefix.getBytes(charset);
-   } else {
-   this.commentPrefix = null;
-   }
-   }
-
-   @Override
-   public void close() throws IOException {
-   if (this.invalidLineCount > 0) {
-   if (LOG.isWarnEnabled()) {
-   LOG.warn("In file \""+ this.filePath + "\" 
(split start: " + this.splitStart + ") " + this.invalidLineCount +" invalid 
line(s) were skipped.");
-   }
-   }
-
-   if (this.commentCount > 0) {
-   if (LOG.isInfoEnabled()) {
-   LOG.info("In file \""+ this.filePath + "\" 
(split start: " + this.splitStart + ") " + this.commentCount +" comment line(s) 
were skipped.");
-   }
-   }
-   super.close();
-   }
-
-   @Override
-   public OUT nextRecord(OUT record) throws IOException {
-   OUT returnRecord = null;
-   do {
-   returnRecord = super.nextRecord(record);
-   } while (returnRecord == null && !reachedEnd());
-
-   return returnRecord;
-   }
-   
@Override
public void open(FileInputSplit split) throws IOException {
super.open(split);
-   
+
@SuppressWarnings("unchecked")
FieldParser[] fieldParsers = (FieldParser[]) 
getFieldParsers();
-   
+
//throw exception if no field parsers are available
if (fieldParsers.length == 0) {
throw new 
IOException("CsvInputFormat.open(FileInputSplit split) - no field parsers to 
parse input");
}
-   
+
// create the value holders
this.parsedValues = new Object[fieldParsers.length];
for (int i = 0; i < fieldParsers.length; i++) {
this.parsedValues[i] = fieldParsers[i].createValue();
}
 
-   this.commentCount = 0;
-   this.invalidLineCount = 0;
-   
// left to right evaluation makes access [0] okay
// this marker is used to fasten up readRecord, so that it 
doesn't have to check each call if the line ending is set to default
if (this.getDelimiter().length == 1 && this.getDelimiter()[0] 
== '\n' ) {
this.lineDelimiterIsLinebreak = 

[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377707#comment-14377707
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r27018833
  
--- Diff: 
flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
 ---
@@ -19,66 +19,91 @@
 package org.apache.flink.api.scala.operators;
 
 
-import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.GenericCsvInputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.parser.FieldParser;
-import org.apache.flink.util.StringUtils;
 
+import org.apache.flink.types.parser.FieldParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.charset.IllegalCharsetNameException;
-import java.nio.charset.UnsupportedCharsetException;
-import java.util.Map;
-import java.util.TreeMap;
+import java.lang.reflect.Field;
+import java.util.Arrays;
 
-import scala.Product;
-
-public class ScalaCsvInputFormat extends 
GenericCsvInputFormat {
+public class ScalaCsvInputFormat extends GenericCsvInputFormat {
 
private static final long serialVersionUID = 1L;
 
private static final Logger LOG = 
LoggerFactory.getLogger(ScalaCsvInputFormat.class);
-   
-   private transient Object[] parsedValues;
-   
-   // To speed up readRecord processing. Used to find windows line endings.
-   // It is set when open so that readRecord does not have to evaluate it
-   private boolean lineDelimiterIsLinebreak = false;
 
-   private final TupleSerializerBase serializer;
+   private transient Object[] parsedValues;
 
-   private byte[] commentPrefix = null;
+   private final TupleSerializerBase tupleSerializer;
 
-   private transient int commentCount;
-   private transient int invalidLineCount;
+   private Class pojoTypeClass = null;
+   private String[] pojoFieldsName = null;
+   private transient Field[] pojoFields = null;
+   private transient PojoTypeInfo pojoTypeInfo = null;
 
public ScalaCsvInputFormat(Path filePath, TypeInformation 
typeInfo) {
super(filePath);
 
-   if (!(typeInfo.isTupleType())) {
-   throw new UnsupportedOperationException("This only 
works on tuple types.");
+   Class[] classes = new Class[typeInfo.getArity()];
+
+   if (typeInfo instanceof TupleTypeInfoBase) {
+   TupleTypeInfoBase tupleType = 
(TupleTypeInfoBase) typeInfo;
+   // We can use an empty config here, since we only use 
the serializer to create
+   // the top-level case class
+   tupleSerializer = (TupleSerializerBase) 
tupleType.createSerializer(new ExecutionConfig());
+
+   for (int i = 0; i < tupleType.getArity(); i++) {
+   classes[i] = 
tupleType.getTypeAt(i).getTypeClass();
+   }
+
+   setFieldTypes(classes);
+   } else {
+   tupleSerializer = null;
+   pojoTypeInfo = (PojoTypeInfo) typeInfo;
+   pojoTypeClass = typeInfo.getTypeClass();
+   pojoFieldsName = pojoTypeInfo.getFieldNames();
+
+   for (int i = 0, arity = pojoTypeInfo.getArity(); i < 
arity; i++) {
+   classes[i] = 
pojoTypeInfo.getTypeAt(i).getTypeClass();
+   }
+
+   setFieldTypes(classes);
+   setFieldsOrder(pojoFieldsName);
+   }
+   }
+
+   public void setFieldsOrder(String[] fieldsOrder) {
+   Preconditions.checkNotNull(pojoTypeClass, "Field ordering 
feature can be used only with POJO fields.");
--- End diff --

Use the same error messages as in the Java CsvInputFormat.


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
> 

[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377706#comment-14377706
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r27018812
  
--- Diff: 
flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
 ---
@@ -19,66 +19,91 @@
 package org.apache.flink.api.scala.operators;
 
 
-import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.GenericCsvInputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.parser.FieldParser;
-import org.apache.flink.util.StringUtils;
 
+import org.apache.flink.types.parser.FieldParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.charset.IllegalCharsetNameException;
-import java.nio.charset.UnsupportedCharsetException;
-import java.util.Map;
-import java.util.TreeMap;
+import java.lang.reflect.Field;
+import java.util.Arrays;
 
-import scala.Product;
-
-public class ScalaCsvInputFormat extends 
GenericCsvInputFormat {
+public class ScalaCsvInputFormat extends GenericCsvInputFormat {
 
private static final long serialVersionUID = 1L;
 
private static final Logger LOG = 
LoggerFactory.getLogger(ScalaCsvInputFormat.class);
-   
-   private transient Object[] parsedValues;
-   
-   // To speed up readRecord processing. Used to find windows line endings.
-   // It is set when open so that readRecord does not have to evaluate it
-   private boolean lineDelimiterIsLinebreak = false;
 
-   private final TupleSerializerBase serializer;
+   private transient Object[] parsedValues;
 
-   private byte[] commentPrefix = null;
+   private final TupleSerializerBase tupleSerializer;
 
-   private transient int commentCount;
-   private transient int invalidLineCount;
+   private Class pojoTypeClass = null;
+   private String[] pojoFieldsName = null;
+   private transient Field[] pojoFields = null;
+   private transient PojoTypeInfo pojoTypeInfo = null;
 
public ScalaCsvInputFormat(Path filePath, TypeInformation 
typeInfo) {
super(filePath);
 
-   if (!(typeInfo.isTupleType())) {
-   throw new UnsupportedOperationException("This only 
works on tuple types.");
+   Class[] classes = new Class[typeInfo.getArity()];
+
+   if (typeInfo instanceof TupleTypeInfoBase) {
+   TupleTypeInfoBase tupleType = 
(TupleTypeInfoBase) typeInfo;
+   // We can use an empty config here, since we only use 
the serializer to create
+   // the top-level case class
+   tupleSerializer = (TupleSerializerBase) 
tupleType.createSerializer(new ExecutionConfig());
+
+   for (int i = 0; i < tupleType.getArity(); i++) {
+   classes[i] = 
tupleType.getTypeAt(i).getTypeClass();
+   }
+
+   setFieldTypes(classes);
+   } else {
+   tupleSerializer = null;
+   pojoTypeInfo = (PojoTypeInfo) typeInfo;
+   pojoTypeClass = typeInfo.getTypeClass();
+   pojoFieldsName = pojoTypeInfo.getFieldNames();
+
+   for (int i = 0, arity = pojoTypeInfo.getArity(); i < 
arity; i++) {
+   classes[i] = 
pojoTypeInfo.getTypeAt(i).getTypeClass();
+   }
+
+   setFieldTypes(classes);
+   setFieldsOrder(pojoFieldsName);
+   }
+   }
+
+   public void setFieldsOrder(String[] fieldsOrder) {
--- End diff --

Rename method to `setOrderOfPOJOFields`


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chi

[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377703#comment-14377703
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r27018715
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -234,9 +228,21 @@ public OUT readRecord(OUT reuse, byte[] bytes, int 
offset, int numBytes) throws
}

if (parseRecord(parsedValues, bytes, offset, numBytes)) {
-   // valid parse, map values into pact record
-   for (int i = 0; i < parsedValues.length; i++) {
-   reuse.setField(parsedValues[i], i);
+   if (pojoTypeClass == null) {
+   // result type is tuple
+   Tuple result = (Tuple) reuse;
+   for (int i = 0; i < parsedValues.length; i++) {
+   result.setField(parsedValues[i], i);
+   }
+   } else {
+   // result type is POJO
+   for (int i = 0; i < parsedValues.length; i++) {
+   try {
+   pojoFields[i].set(reuse, 
parsedValues[i]);
+   } catch (IllegalAccessException e) {
+   throw new RuntimeException(e);
--- End diff --

Add a message like `"Parsed value could not be set in POJO field \"" + 
pojoFieldsName[i] + "\""`


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377700#comment-14377700
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r27018597
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -52,103 +50,86 @@

public static final String DEFAULT_FIELD_DELIMITER = ",";
 
-
private transient Object[] parsedValues;
-   
-   private byte[] commentPrefix = null;
-   
-   // To speed up readRecord processing. Used to find windows line endings.
-   // It is set when open so that readRecord does not have to evaluate it
-   private boolean lineDelimiterIsLinebreak = false;
-   
-   private transient int commentCount;
 
-   private transient int invalidLineCount;
-   
-   
-   public CsvInputFormat(Path filePath) {
-   super(filePath);
-   }   
-   
-   public CsvInputFormat(Path filePath, Class ... types) {
-   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
types);
-   }   
+   private Class pojoTypeClass = null;
+   private String[] pojoFieldsName = null;
+   private transient Field[] pojoFields = null;
+   private transient PojoTypeInfo pojoTypeInfo = null;
+
+   public CsvInputFormat(Path filePath, TypeInformation 
typeInformation) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
typeInformation);
+   }

-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, Class... types) {
+   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, TypeInformation typeInformation) {
super(filePath);
 
+   Preconditions.checkArgument(typeInformation instanceof 
CompositeType);
+   CompositeType compositeType = (CompositeType) 
typeInformation;
+
setDelimiter(lineDelimiter);
setFieldDelimiter(fieldDelimiter);
 
-   setFieldTypes(types);
-   }
-   
-   
-   public byte[] getCommentPrefix() {
-   return commentPrefix;
-   }
-   
-   public void setCommentPrefix(byte[] commentPrefix) {
-   this.commentPrefix = commentPrefix;
-   }
-   
-   public void setCommentPrefix(char commentPrefix) {
-   setCommentPrefix(String.valueOf(commentPrefix));
-   }
-   
-   public void setCommentPrefix(String commentPrefix) {
-   setCommentPrefix(commentPrefix, Charsets.UTF_8);
-   }
-   
-   public void setCommentPrefix(String commentPrefix, String charsetName) 
throws IllegalCharsetNameException, UnsupportedCharsetException {
-   if (charsetName == null) {
-   throw new IllegalArgumentException("Charset name must 
not be null");
+   Class[] classes = new Class[typeInformation.getArity()];
+   for (int i = 0, arity = typeInformation.getArity(); i < arity; 
i++) {
+   classes[i] = compositeType.getTypeAt(i).getTypeClass();
}
-   
-   if (commentPrefix != null) {
-   Charset charset = Charset.forName(charsetName);
-   setCommentPrefix(commentPrefix, charset);
-   } else {
-   this.commentPrefix = null;
+   setFieldTypes(classes);
+
+   if (typeInformation instanceof PojoTypeInfo) {
+   pojoTypeInfo = (PojoTypeInfo) typeInformation;
+   pojoTypeClass = typeInformation.getTypeClass();
+   pojoFieldsName = compositeType.getFieldNames();
+   setFieldsOrder(pojoFieldsName);
}
}
-   
-   public void setCommentPrefix(String commentPrefix, Charset charset) {
-   if (charset == null) {
-   throw new IllegalArgumentException("Charset must not be 
null");
+
+   public void setFieldsOrder(String[] fieldsOrder) {
+   Preconditions.checkNotNull(pojoTypeClass, "Field ordering 
feature can be used only with POJO fields.");
+   Preconditions.checkNotNull(fieldsOrder);
+
+   int includedCount = 0;
+   for (boolean isIncluded : fieldIncluded) {
+   if (isIncluded) {
+   includedCount++;
+   }
}
-   if (commentPrefix != null) {
-   this.commentPrefix = commentPrefix.getBytes(charset);
-   } else {
-   this.commentPrefix = null;
+
   

[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377698#comment-14377698
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r27018473
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -52,103 +50,86 @@

public static final String DEFAULT_FIELD_DELIMITER = ",";
 
-
private transient Object[] parsedValues;
-   
-   private byte[] commentPrefix = null;
-   
-   // To speed up readRecord processing. Used to find windows line endings.
-   // It is set when open so that readRecord does not have to evaluate it
-   private boolean lineDelimiterIsLinebreak = false;
-   
-   private transient int commentCount;
 
-   private transient int invalidLineCount;
-   
-   
-   public CsvInputFormat(Path filePath) {
-   super(filePath);
-   }   
-   
-   public CsvInputFormat(Path filePath, Class ... types) {
-   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
types);
-   }   
+   private Class pojoTypeClass = null;
+   private String[] pojoFieldsName = null;
+   private transient Field[] pojoFields = null;
+   private transient PojoTypeInfo pojoTypeInfo = null;
+
+   public CsvInputFormat(Path filePath, TypeInformation 
typeInformation) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
typeInformation);
+   }

-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, Class... types) {
+   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, TypeInformation typeInformation) {
super(filePath);
 
+   Preconditions.checkArgument(typeInformation instanceof 
CompositeType);
+   CompositeType compositeType = (CompositeType) 
typeInformation;
+
setDelimiter(lineDelimiter);
setFieldDelimiter(fieldDelimiter);
 
-   setFieldTypes(types);
-   }
-   
-   
-   public byte[] getCommentPrefix() {
-   return commentPrefix;
-   }
-   
-   public void setCommentPrefix(byte[] commentPrefix) {
-   this.commentPrefix = commentPrefix;
-   }
-   
-   public void setCommentPrefix(char commentPrefix) {
-   setCommentPrefix(String.valueOf(commentPrefix));
-   }
-   
-   public void setCommentPrefix(String commentPrefix) {
-   setCommentPrefix(commentPrefix, Charsets.UTF_8);
-   }
-   
-   public void setCommentPrefix(String commentPrefix, String charsetName) 
throws IllegalCharsetNameException, UnsupportedCharsetException {
-   if (charsetName == null) {
-   throw new IllegalArgumentException("Charset name must 
not be null");
+   Class[] classes = new Class[typeInformation.getArity()];
+   for (int i = 0, arity = typeInformation.getArity(); i < arity; 
i++) {
+   classes[i] = compositeType.getTypeAt(i).getTypeClass();
}
-   
-   if (commentPrefix != null) {
-   Charset charset = Charset.forName(charsetName);
-   setCommentPrefix(commentPrefix, charset);
-   } else {
-   this.commentPrefix = null;
+   setFieldTypes(classes);
+
+   if (typeInformation instanceof PojoTypeInfo) {
+   pojoTypeInfo = (PojoTypeInfo) typeInformation;
+   pojoTypeClass = typeInformation.getTypeClass();
+   pojoFieldsName = compositeType.getFieldNames();
+   setFieldsOrder(pojoFieldsName);
}
}
-   
-   public void setCommentPrefix(String commentPrefix, Charset charset) {
-   if (charset == null) {
-   throw new IllegalArgumentException("Charset must not be 
null");
+
+   public void setFieldsOrder(String[] fieldsOrder) {
+   Preconditions.checkNotNull(pojoTypeClass, "Field ordering 
feature can be used only with POJO fields.");
+   Preconditions.checkNotNull(fieldsOrder);
+
+   int includedCount = 0;
+   for (boolean isIncluded : fieldIncluded) {
+   if (isIncluded) {
+   includedCount++;
+   }
}
-   if (commentPrefix != null) {
-   this.commentPrefix = commentPrefix.getBytes(charset);
-   } else {
-   this.commentPrefix = null;
+
   

[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377691#comment-14377691
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r27018316
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -52,103 +50,86 @@

public static final String DEFAULT_FIELD_DELIMITER = ",";
 
-
private transient Object[] parsedValues;
-   
-   private byte[] commentPrefix = null;
-   
-   // To speed up readRecord processing. Used to find windows line endings.
-   // It is set when open so that readRecord does not have to evaluate it
-   private boolean lineDelimiterIsLinebreak = false;
-   
-   private transient int commentCount;
 
-   private transient int invalidLineCount;
-   
-   
-   public CsvInputFormat(Path filePath) {
-   super(filePath);
-   }   
-   
-   public CsvInputFormat(Path filePath, Class ... types) {
-   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
types);
-   }   
+   private Class pojoTypeClass = null;
+   private String[] pojoFieldsName = null;
+   private transient Field[] pojoFields = null;
+   private transient PojoTypeInfo pojoTypeInfo = null;
+
+   public CsvInputFormat(Path filePath, TypeInformation 
typeInformation) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
typeInformation);
+   }

-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, Class... types) {
+   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, TypeInformation typeInformation) {
super(filePath);
 
+   Preconditions.checkArgument(typeInformation instanceof 
CompositeType);
+   CompositeType compositeType = (CompositeType) 
typeInformation;
+
setDelimiter(lineDelimiter);
setFieldDelimiter(fieldDelimiter);
 
-   setFieldTypes(types);
-   }
-   
-   
-   public byte[] getCommentPrefix() {
-   return commentPrefix;
-   }
-   
-   public void setCommentPrefix(byte[] commentPrefix) {
-   this.commentPrefix = commentPrefix;
-   }
-   
-   public void setCommentPrefix(char commentPrefix) {
-   setCommentPrefix(String.valueOf(commentPrefix));
-   }
-   
-   public void setCommentPrefix(String commentPrefix) {
-   setCommentPrefix(commentPrefix, Charsets.UTF_8);
-   }
-   
-   public void setCommentPrefix(String commentPrefix, String charsetName) 
throws IllegalCharsetNameException, UnsupportedCharsetException {
-   if (charsetName == null) {
-   throw new IllegalArgumentException("Charset name must 
not be null");
+   Class[] classes = new Class[typeInformation.getArity()];
+   for (int i = 0, arity = typeInformation.getArity(); i < arity; 
i++) {
+   classes[i] = compositeType.getTypeAt(i).getTypeClass();
}
-   
-   if (commentPrefix != null) {
-   Charset charset = Charset.forName(charsetName);
-   setCommentPrefix(commentPrefix, charset);
-   } else {
-   this.commentPrefix = null;
+   setFieldTypes(classes);
+
+   if (typeInformation instanceof PojoTypeInfo) {
+   pojoTypeInfo = (PojoTypeInfo) typeInformation;
+   pojoTypeClass = typeInformation.getTypeClass();
+   pojoFieldsName = compositeType.getFieldNames();
+   setFieldsOrder(pojoFieldsName);
}
}
-   
-   public void setCommentPrefix(String commentPrefix, Charset charset) {
-   if (charset == null) {
-   throw new IllegalArgumentException("Charset must not be 
null");
+
+   public void setFieldsOrder(String[] fieldsOrder) {
--- End diff --

Change method name to `setOrderOfPOJOFields`?


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into P

[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377690#comment-14377690
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r27018277
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -52,103 +50,86 @@

public static final String DEFAULT_FIELD_DELIMITER = ",";
 
-
private transient Object[] parsedValues;
-   
-   private byte[] commentPrefix = null;
-   
-   // To speed up readRecord processing. Used to find windows line endings.
-   // It is set when open so that readRecord does not have to evaluate it
-   private boolean lineDelimiterIsLinebreak = false;
-   
-   private transient int commentCount;
 
-   private transient int invalidLineCount;
-   
-   
-   public CsvInputFormat(Path filePath) {
-   super(filePath);
-   }   
-   
-   public CsvInputFormat(Path filePath, Class ... types) {
-   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
types);
-   }   
+   private Class pojoTypeClass = null;
+   private String[] pojoFieldsName = null;
+   private transient Field[] pojoFields = null;
+   private transient PojoTypeInfo pojoTypeInfo = null;
+
+   public CsvInputFormat(Path filePath, TypeInformation 
typeInformation) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
typeInformation);
+   }

-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, Class... types) {
+   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, TypeInformation typeInformation) {
super(filePath);
 
+   Preconditions.checkArgument(typeInformation instanceof 
CompositeType);
+   CompositeType compositeType = (CompositeType) 
typeInformation;
+
setDelimiter(lineDelimiter);
setFieldDelimiter(fieldDelimiter);
 
-   setFieldTypes(types);
-   }
-   
-   
-   public byte[] getCommentPrefix() {
-   return commentPrefix;
-   }
-   
-   public void setCommentPrefix(byte[] commentPrefix) {
-   this.commentPrefix = commentPrefix;
-   }
-   
-   public void setCommentPrefix(char commentPrefix) {
-   setCommentPrefix(String.valueOf(commentPrefix));
-   }
-   
-   public void setCommentPrefix(String commentPrefix) {
-   setCommentPrefix(commentPrefix, Charsets.UTF_8);
-   }
-   
-   public void setCommentPrefix(String commentPrefix, String charsetName) 
throws IllegalCharsetNameException, UnsupportedCharsetException {
-   if (charsetName == null) {
-   throw new IllegalArgumentException("Charset name must 
not be null");
+   Class[] classes = new Class[typeInformation.getArity()];
+   for (int i = 0, arity = typeInformation.getArity(); i < arity; 
i++) {
+   classes[i] = compositeType.getTypeAt(i).getTypeClass();
}
-   
-   if (commentPrefix != null) {
-   Charset charset = Charset.forName(charsetName);
-   setCommentPrefix(commentPrefix, charset);
-   } else {
-   this.commentPrefix = null;
+   setFieldTypes(classes);
+
+   if (typeInformation instanceof PojoTypeInfo) {
+   pojoTypeInfo = (PojoTypeInfo) typeInformation;
+   pojoTypeClass = typeInformation.getTypeClass();
+   pojoFieldsName = compositeType.getFieldNames();
+   setFieldsOrder(pojoFieldsName);
}
}
-   
-   public void setCommentPrefix(String commentPrefix, Charset charset) {
-   if (charset == null) {
-   throw new IllegalArgumentException("Charset must not be 
null");
+
+   public void setFieldsOrder(String[] fieldsOrder) {
+   Preconditions.checkNotNull(pojoTypeClass, "Field ordering 
feature can be used only with POJO fields.");
--- End diff --

How about `"Field order can only be specified if output type is a POJO"`


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  

[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377681#comment-14377681
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r27018151
  
--- Diff: 
flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
 ---
@@ -98,98 +123,66 @@ public void setFields(int[] sourceFieldIndices, 
Class[] fieldTypes) {
setFieldsGeneric(sourceFieldIndices, fieldTypes);
}
 
-   public byte[] getCommentPrefix() {
-   return commentPrefix;
-   }
-
-   public void setCommentPrefix(byte[] commentPrefix) {
-   this.commentPrefix = commentPrefix;
-   }
-
-   public void setCommentPrefix(char commentPrefix) {
-   setCommentPrefix(String.valueOf(commentPrefix));
-   }
+   public void setFields(boolean[] sourceFieldMask, Class[] fieldTypes) 
{
+   Preconditions.checkNotNull(sourceFieldMask);
+   Preconditions.checkNotNull(fieldTypes);
 
-   public void setCommentPrefix(String commentPrefix) {
-   setCommentPrefix(commentPrefix, Charsets.UTF_8);
+   setFieldsGeneric(sourceFieldMask, fieldTypes);
}
 
-   public void setCommentPrefix(String commentPrefix, String charsetName) 
throws IllegalCharsetNameException, UnsupportedCharsetException {
-   if (charsetName == null) {
-   throw new IllegalArgumentException("Charset name must 
not be null");
-   }
-
-   if (commentPrefix != null) {
-   Charset charset = Charset.forName(charsetName);
-   setCommentPrefix(commentPrefix, charset);
-   } else {
-   this.commentPrefix = null;
-   }
+   public Class[] getFieldTypes() {
+   return super.getGenericFieldTypes();
}
 
-   public void setCommentPrefix(String commentPrefix, Charset charset) {
-   if (charset == null) {
-   throw new IllegalArgumentException("Charset must not be 
null");
-   }
-   if (commentPrefix != null) {
-   this.commentPrefix = commentPrefix.getBytes(charset);
-   } else {
-   this.commentPrefix = null;
-   }
-   }
-
-   @Override
-   public void close() throws IOException {
--- End diff --

Did the `close()` method got lost in the refactoring?


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377664#comment-14377664
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-85447697
  
I see the issue with the non-deterministic field order and FLINK-1665 as 
follows. Both, FLINK-1665 and Option 3, solve the problem of non-deterministic 
field order. 

- FLINK-1665 by specifying the order in the POJO using annoations.
- Option 3 by forcing the user to explicitly specify the field order. 

While we don't have FLINK-1665 implemented, I would go with Option 3. Once 
we have FLINK-1665 we could relax it for POJOs with `@Position` annotations.


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377067#comment-14377067
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-85285193
  
@fhueske Hi, Thanks for your kindly advice! I will fix them soon.

About the order of POJO fields, I think also that the option 3 is good. 
However, [FLINK-1665](https://issues.apache.org/jira/browse/FLINK-1665) is not 
implemented yet. I would implement the option 1 and 2 now. After FLINK-1665 
completed, we can implement the option 3.

About the inheritance of `ScalaCsvInputFormat`, I didn't think about Record 
API. Your opinion looks good. I will revert the changes and refactor 
`GenericCsvInputFormat` to contain duplicated methods.


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14377000#comment-14377000
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-85261367
  
Hi @chiwanpark, the PR improved a lot! Besides a few inline comments, the 
logic and test coverage looks pretty good. Unfortunately, the PR does not seem 
to build correctly (Check 
[Travis](https://travis-ci.org/apache/flink/builds/54853098)). I'll do another 
quick pass tomorrow and have a closer look at the error messages. 

IMO, there are just two things left to be discussed before we can test it 
on a cluster:

**Order of POJO fields**
This is actually a crucial point. We need to make sure, that the field 
order is deterministic for all setups. I think (not sure!) right now, the order 
of the fields depends on the order in which the fields are returned by Java's 
reflection tools. This order needs to be standardized to ensure that all JVMs 
that obey the standard are compatible. However, even a standardized order 
(lexicographic order) might not be very helpful.

There are several options here:
1. If there the order of fields is standardized, just use that by default.
2. If not, we can deterministically sort the fields ourselves in the 
PojoTypeInfo.
3. We make the definition of a field order mandatory until we can define 
the order of POJO fields (e.g., via the proposed `@Position` annotation).
I am leaning towards option 3. 

**Changed Inheritance of ScalaCsvInputFormat**
This is a very good observation, however I would keep the original 
inheritance for now. At the moment, GenericCsvInputFormat is extended by 
CsvInputFormat (for Java API), ScalaCsvInputFormat (for Scala API), 
CsvInputFormat (for Record API), and a test class. We will soon remove the 
RecordAPI such that only the Java and the Scala API will remain. I think we can 
then move all common operations to GenericCsvInputFormat (maybe we can do that 
already...). @chiwanpark does that make sense to you or do you disagree?

@chiwanpark What do you think? Any other opinions?


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14376976#comment-14376976
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26994167
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/io/CsvReaderWithPOJOITCase.java 
---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.io;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class CsvReaderWithPOJOITCase extends MultipleProgramsTestBase {
+   private String resultPath;
+   private String expected;
+
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();
+
+   public CsvReaderWithPOJOITCase(ExecutionMode mode) {
+   super(mode);
+   }
+
+   @Before
+   public void before() throws Exception {
+   resultPath = tempFolder.newFile("result").toURI().toString();
+   }
+
+   @After
+   public void after() throws Exception {
+   compareResultsByLinesInMemory(expected, resultPath);
+   }
+
+   private String createInputData(String data) throws Exception {
+   File file = tempFolder.newFile("input");
+   Files.write(data, file, Charsets.UTF_8);
+
+   return file.toURI().toString();
+   }
+
+   @Test
+   public void testPOJOType() throws Exception {
+   final String dataPath = 
createInputData("ABC,2.20,3\nDEF,5.1,5\nDEF,3.30,1\nGHI,3.30,10");
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   DataSet data = 
env.readCsvFile(dataPath).pojoType(POJOItem.class, new String[]{"f1", "f3", 
"f2"});
+   DataSet> result = data.map(new 
POJOToTupleMapper()).groupBy(0).sum(1);
--- End diff --

I would remove the `POJOToTupleMapper` and the aggregation and instead 
directly emit the POJO data with `writeAsText()`. This will use the 
`toString()` method of the POJO to write the data.
Simplifies the test and restricts it to the essential part.


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14376970#comment-14376970
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26993897
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
 ---
@@ -247,16 +252,27 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   inputFormat.enableQuotedStringParsing(quoteCharacter);
 }
 
-val classes: Array[Class[_]] = new Array[Class[_]](typeInfo.getArity)
+val classesBuf: ArrayBuffer[Class[_]] = new ArrayBuffer[Class[_]]
 for (i <- 0 until typeInfo.getArity) {
-  classes(i) = typeInfo.getTypeAt(i).getTypeClass
+  typeInfo match {
+case info: TupleTypeInfoBase[T] => classesBuf += 
info.getTypeAt(i).getTypeClass()
+case info: PojoTypeInfo[T] =>
+  if (includedFields == null || includedFields.indexOf(i) != -1) {
--- End diff --

Just add all POJO fields without checking against includedFields. 
IncludedFields refer to the fields of the CSV file not to the fields of the 
POJO.
The later check ensures that the number of read fields and the number of 
POJO/Tuple fields is equal.


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14376952#comment-14376952
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26993339
  
--- Diff: 
flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
 ---
@@ -219,73 +92,24 @@ public OUT readRecord(OUT reuse, byte[] bytes, int 
offset, int numBytes) {
}
 
if (parseRecord(parsedValues, bytes, offset, numBytes)) {
-   OUT result = serializer.createInstance(parsedValues);
-   return result;
+   if (tupleSerializer != null) {
+   return 
tupleSerializer.createInstance(parsedValues);
+   } else {
+   try {
+   for (int i = 0; i < pojoFields.length; 
i++) {
+   pojoFields[i].set(reuse, 
parsedValues[i]);
+   }
+   } catch (IllegalAccessException e) {
+   LOG.error("Cannot set value to given 
type object!", e);
--- End diff --

same as in the `CsvInputFormat`. Forward the exception to kill the job.


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14376943#comment-14376943
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26993011
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java 
---
@@ -684,4 +693,249 @@ private void testRemovingTrailingCR(String 
lineBreakerInFile, String lineBreaker
}
}
 
+   private void validatePojoItem(CsvInputFormat format) throws 
Exception {
+   PojoItem item = new PojoItem();
+
+   format.nextRecord(item);
+
+   assertEquals(123, item.field1);
+   assertEquals("AAA", item.field2);
+   assertEquals(Double.valueOf(3.123), item.field3);
+   assertEquals("BBB", item.field4);
+
+   format.nextRecord(item);
+
+   assertEquals(456, item.field1);
+   assertEquals("BBB", item.field2);
+   assertEquals(Double.valueOf(1.123), item.field3);
+   assertEquals("AAA", item.field4);
+   }
+
+   @Test
+   public void testPojoType() throws Exception {
+   File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+   tempFile.deleteOnExit();
+   tempFile.setWritable(true);
+
+   OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(tempFile));
+   wrt.write("123,AAA,3.123,BBB\n");
+   wrt.write("456,BBB,1.123,AAA\n");
+   wrt.close();
+
+   @SuppressWarnings("unchecked")
+   TypeInformation typeInfo = 
(TypeInformation) TypeExtractor.createTypeInfo(PojoItem.class);
+   CsvInputFormat inputFormat = new 
CsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo);
+
+   inputFormat.configure(new Configuration());
+   FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+   inputFormat.open(splits[0]);
+
+   validatePojoItem(inputFormat);
+   }
+
+   @Test
+   public void testPojoTypeWithPrivateField() throws Exception {
+   File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+   tempFile.deleteOnExit();
+   tempFile.setWritable(true);
+
+   OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(tempFile));
+   wrt.write("123,AAA,3.123,BBB\n");
+   wrt.write("456,BBB,1.123,AAA\n");
+   wrt.close();
+
+   @SuppressWarnings("unchecked")
+   TypeInformation typeInfo = 
(TypeInformation) 
TypeExtractor.createTypeInfo(PrivatePojoItem.class);
+   CsvInputFormat inputFormat = new 
CsvInputFormat(new Path(tempFile.toURI().toString()), 
typeInfo);
+
+   inputFormat.configure(new Configuration());
+
+   FileInputSplit[] splits = inputFormat.createInputSplits(1);
+   inputFormat.open(splits[0]);
+
+   PrivatePojoItem item = new PrivatePojoItem();
+   inputFormat.nextRecord(item);
+
+   assertEquals(123, item.field1);
+   assertEquals("AAA", item.field2);
+   assertEquals(Double.valueOf(3.123), item.field3);
+   assertEquals("BBB", item.field4);
+
+   inputFormat.nextRecord(item);
+
+   assertEquals(456, item.field1);
+   assertEquals("BBB", item.field2);
+   assertEquals(Double.valueOf(1.123), item.field3);
+   assertEquals("AAA", item.field4);
+   }
+
+   @Test
+   public void testPojoTypeWithMappingInformation() throws Exception {
+   File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+   tempFile.deleteOnExit();
+   tempFile.setWritable(true);
+
+   OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(tempFile));
+   wrt.write("123,3.123,AAA,BBB\n");
+   wrt.write("456,1.123,BBB,AAA\n");
+   wrt.close();
+
+   @SuppressWarnings("unchecked")
+   TypeInformation typeInfo = 
(TypeInformation) TypeExtractor.createTypeInfo(PojoItem.class);
+   CsvInputFormat inputFormat = new 
CsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo);
+   inputFormat.setFieldsOrder(new String[]{"field1", "field3", 
"field2", "field4"});
+
+   inputFormat.configure(new Configuration());
+   FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+   inputFormat.open(splits[0]);
+
+   validatePojoItem(inputForm

[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14376939#comment-14376939
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26992871
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java 
---
@@ -684,4 +693,249 @@ private void testRemovingTrailingCR(String 
lineBreakerInFile, String lineBreaker
}
}
 
+   private void validatePojoItem(CsvInputFormat format) throws 
Exception {
+   PojoItem item = new PojoItem();
+
+   format.nextRecord(item);
+
+   assertEquals(123, item.field1);
+   assertEquals("AAA", item.field2);
+   assertEquals(Double.valueOf(3.123), item.field3);
+   assertEquals("BBB", item.field4);
+
+   format.nextRecord(item);
+
+   assertEquals(456, item.field1);
+   assertEquals("BBB", item.field2);
+   assertEquals(Double.valueOf(1.123), item.field3);
+   assertEquals("AAA", item.field4);
+   }
+
+   @Test
+   public void testPojoType() throws Exception {
+   File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+   tempFile.deleteOnExit();
+   tempFile.setWritable(true);
+
+   OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(tempFile));
+   wrt.write("123,AAA,3.123,BBB\n");
+   wrt.write("456,BBB,1.123,AAA\n");
+   wrt.close();
+
+   @SuppressWarnings("unchecked")
+   TypeInformation typeInfo = 
(TypeInformation) TypeExtractor.createTypeInfo(PojoItem.class);
+   CsvInputFormat inputFormat = new 
CsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo);
+
+   inputFormat.configure(new Configuration());
+   FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+   inputFormat.open(splits[0]);
+
+   validatePojoItem(inputFormat);
+   }
+
+   @Test
+   public void testPojoTypeWithPrivateField() throws Exception {
+   File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+   tempFile.deleteOnExit();
+   tempFile.setWritable(true);
+
+   OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(tempFile));
+   wrt.write("123,AAA,3.123,BBB\n");
+   wrt.write("456,BBB,1.123,AAA\n");
+   wrt.close();
+
+   @SuppressWarnings("unchecked")
+   TypeInformation typeInfo = 
(TypeInformation) 
TypeExtractor.createTypeInfo(PrivatePojoItem.class);
+   CsvInputFormat inputFormat = new 
CsvInputFormat(new Path(tempFile.toURI().toString()), 
typeInfo);
+
+   inputFormat.configure(new Configuration());
+
+   FileInputSplit[] splits = inputFormat.createInputSplits(1);
+   inputFormat.open(splits[0]);
+
+   PrivatePojoItem item = new PrivatePojoItem();
+   inputFormat.nextRecord(item);
--- End diff --

you can use the `validatePojoItem()` method here as well.


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14376906#comment-14376906
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26991431
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -234,9 +301,23 @@ public OUT readRecord(OUT reuse, byte[] bytes, int 
offset, int numBytes) throws
}

if (parseRecord(parsedValues, bytes, offset, numBytes)) {
-   // valid parse, map values into pact record
-   for (int i = 0; i < parsedValues.length; i++) {
-   reuse.setField(parsedValues[i], i);
+   if (pojoTypeClass == null) {
+   // result type is tuple
+   Tuple result = (Tuple) reuse;
+   for (int i = 0; i < parsedValues.length; i++) {
+   result.setField(parsedValues[i], i);
+   }
+   } else {
+   // result type is POJO
+   for (int i = 0; i < parsedValues.length; i++) {
+   try {
+   pojoFields[i].set(reuse, 
parsedValues[i]);
+   } catch (IllegalAccessException e) {
+   LOG.error("Cannot set value to 
given type object!", e);
--- End diff --

This error will not be caused by invalid data.
You should rethrow this exception to kill the job. If the field cannot be 
set, something is seriously wrong and the job should not continue. 



> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14375858#comment-14375858
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-84977748
  
Hi @chiwanpark 
Thanks for updating the PR! :-)
I was gone for a few days. Will have a look at your PR shortly.


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14366938#comment-14366938
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-82885499
  
I updated this PR.

* Change method of obtaining `Field` object from using `PojoTypeInfo` to 
saving field names. (Thanks @fhueske for advice!)
* `ScalaCsvInputFormat` extends `CsvInputFormat` because there are many 
duplicated code between the two classes.
* Add integration tests for `CsvReader` (Java API) and 
`ExecutionEnvironment.readCsvFile` (Scala API)

Any feedback is welcome! (especially error message because of my poor 
english)


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14360531#comment-14360531
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-79066240
  
@aljoscha Thanks! I understand about it.


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14360159#comment-14360159
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-78897265
  
Hi,
in Java, the CsvOutputFormat used to only emit Tuples, we know that these 
are mutable, therefore they are reused. The ScalaCsvOutputFormat emits case 
classes (Scala Tuples are in fact Case Classes). These are generally not 
mutable, the fields cannot be changed. Therefore we have to create a new object 
every time.


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14360124#comment-14360124
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-78876571
  
Hello. I have a question about object reuse in `readRecord` method of 
`ScalaCsvInputFormat`. In java implementation, `CsvInputFormat` reuse result 
object. But in `ScalaCsvInputFormat`, we don't reuse object and create instance 
for each record. Why don't `ScalaCsvInputFormat` reuse object?


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14357861#comment-14357861
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-78401359
  
@fhueske Thanks for your kindly advice. I will fix as soon as possible. 


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14356960#comment-14356960
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-78276533
  
Hi, thanks for the update! This is really good progress!

I had a look at your code and noticed a few things apart from the inline 
comments.
- PojoTypeInformation is unfortunately not serializable (as I had to learn 
a few days ago myself) because ``java.lang.reflect.Field`` is not serializable. 
So the InputFormat will not work if used in a complete Flink program. What you 
need to do to get the IF working is to add member variables for the POJO class 
and all field names. In the ``open()`` method you need to create the ``Field`` 
objects with ``clazz.getDeclaredField(fieldName)`` and use these fields in the 
``readRecord`` method.

- Add (at least) two end-to-end tests for the Java API and the Scala API 
that use the new Pojo feature. These tests will catch the TypeInformation 
serialization problem.

Please don't hesitate to get in touch if you have any questions!




> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14356935#comment-14356935
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26216185
  
--- Diff: 
flink-scala/src/main/java/org/apache/flink/api/scala/operators/ScalaCsvInputFormat.java
 ---
@@ -66,18 +66,30 @@
public ScalaCsvInputFormat(Path filePath, TypeInformation 
typeInfo) {
super(filePath);
 
-   if (!(typeInfo.isTupleType())) {
-   throw new UnsupportedOperationException("This only 
works on tuple types.");
-   }
-   TupleTypeInfoBase tupleType = (TupleTypeInfoBase) 
typeInfo;
-   // We can use an empty config here, since we only use the 
serializer to create
-   // the top-level case class
-   serializer = (TupleSerializerBase) 
tupleType.createSerializer(new ExecutionConfig());
-
-   Class[] classes = new Class[tupleType.getArity()];
-   for (int i = 0; i < tupleType.getArity(); i++) {
-   classes[i] = tupleType.getTypeAt(i).getTypeClass();
+   Class[] classes = new Class[typeInfo.getArity()];
+
+   if (typeInfo instanceof TupleTypeInfoBase) {
--- End diff --

It would be nice to also support Scala case classes. Check out the 
CaseClassTypeInfo for some details if you want to add support for that.


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14356930#comment-14356930
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26215940
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java 
---
@@ -684,4 +693,178 @@ private void testRemovingTrailingCR(String 
lineBreakerInFile, String lineBreaker
}
}
 
+   @Test
+   public void testPojoType() throws Exception {
+   File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+   tempFile.deleteOnExit();
+   tempFile.setWritable(true);
+
+   OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(tempFile));
+   wrt.write("123,AAA,3.123,BBB\n");
+   wrt.write("456,BBB,1.123,AAA\n");
+   wrt.close();
+
+   @SuppressWarnings("unchecked")
+   TypeInformation typeInfo = 
(TypeInformation) TypeExtractor.createTypeInfo(PojoItem.class);
+   CsvInputFormat inputFormat = new 
CsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo);
+
+   Configuration parameters = new Configuration();
+   inputFormat.configure(parameters);
+
+   inputFormat.setDelimiter('\n');
+   inputFormat.setFieldDelimiter(',');
+
+   FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+   inputFormat.open(splits[0]);
+
+   PojoItem item = new PojoItem();
+   inputFormat.nextRecord(item);
+
+   assertEquals(123, item.field1);
+   assertEquals("AAA", item.field2);
+   assertEquals(Double.valueOf(3.123), item.field3);
+   assertEquals("BBB", item.field4);
+
+   inputFormat.nextRecord(item);
+
+   assertEquals(456, item.field1);
+   assertEquals("BBB", item.field2);
+   assertEquals(Double.valueOf(1.123), item.field3);
+   assertEquals("AAA", item.field4);
+   }
+
+   @Test
+   public void testPojoTypeWithPrivateField() throws Exception {
+   File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+   tempFile.deleteOnExit();
+   tempFile.setWritable(true);
+
+   OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(tempFile));
+   wrt.write("123,AAA,3.123,BBB\n");
+   wrt.write("456,BBB,1.123,AAA\n");
+   wrt.close();
+
+   @SuppressWarnings("unchecked")
+   TypeInformation typeInfo = 
(TypeInformation) 
TypeExtractor.createTypeInfo(PrivatePojoItem.class);
+   CsvInputFormat inputFormat = new 
CsvInputFormat(new Path(tempFile.toURI().toString()), 
typeInfo);
+
+   Configuration parameters = new Configuration();
+   inputFormat.configure(parameters);
+
+   inputFormat.setDelimiter('\n');
+   inputFormat.setFieldDelimiter(',');
+
+   FileInputSplit[] splits = inputFormat.createInputSplits(1);
+
+   inputFormat.open(splits[0]);
+
+   PrivatePojoItem item = new PrivatePojoItem();
+   inputFormat.nextRecord(item);
+
+   assertEquals(123, item.field1);
+   assertEquals("AAA", item.field2);
+   assertEquals(Double.valueOf(3.123), item.field3);
+   assertEquals("BBB", item.field4);
+
+   inputFormat.nextRecord(item);
+
+   assertEquals(456, item.field1);
+   assertEquals("BBB", item.field2);
+   assertEquals(Double.valueOf(1.123), item.field3);
+   assertEquals("AAA", item.field4);
+   }
+
+   @Test
+   public void testPojoTypeWithMappingInformation() throws Exception {
+   File tempFile = File.createTempFile("CsvReaderPojoType", "tmp");
+   tempFile.deleteOnExit();
+   tempFile.setWritable(true);
+
+   OutputStreamWriter wrt = new OutputStreamWriter(new 
FileOutputStream(tempFile));
+   wrt.write("123,3.123,AAA,BBB\n");
+   wrt.write("456,1.123,BBB,AAA\n");
+   wrt.close();
+
+   @SuppressWarnings("unchecked")
+   TypeInformation typeInfo = 
(TypeInformation) TypeExtractor.createTypeInfo(PojoItem.class);
+   CsvInputFormat inputFormat = new 
CsvInputFormat(new Path(tempFile.toURI().toString()), typeInfo);
+   inputFormat.setFieldsMap(new String[]{"field1", "field3", 
"field2", "field4"});
+
+   Configuration parameters = 

[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14356791#comment-14356791
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26206381
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -234,9 +291,29 @@ public OUT readRecord(OUT reuse, byte[] bytes, int 
offset, int numBytes) throws
}

if (parseRecord(parsedValues, bytes, offset, numBytes)) {
-   // valid parse, map values into pact record
-   for (int i = 0; i < parsedValues.length; i++) {
-   reuse.setField(parsedValues[i], i);
+   if (typeInformation instanceof TupleTypeInfoBase) {
+   // result type is tuple
+   Tuple result = (Tuple) reuse;
+   for (int i = 0; i < parsedValues.length; i++) {
+   result.setField(parsedValues[i], i);
+   }
+   } else {
+   // result type is POJO
+   PojoTypeInfo pojoTypeInfo = 
(PojoTypeInfo) typeInformation;
+   for (int i = 0; i < parsedValues.length; i++) {
+   if (fieldsMap[i] == null) {
+   continue;
+   }
+
+   try {
+   int fieldIndex = 
typeInformation.getFieldIndex(fieldsMap[i]);
+   
pojoTypeInfo.getPojoFieldAt(fieldIndex).field.set(reuse, parsedValues[i]);
--- End diff --

Same for the ``field``.


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14356787#comment-14356787
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26206307
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -234,9 +291,29 @@ public OUT readRecord(OUT reuse, byte[] bytes, int 
offset, int numBytes) throws
}

if (parseRecord(parsedValues, bytes, offset, numBytes)) {
-   // valid parse, map values into pact record
-   for (int i = 0; i < parsedValues.length; i++) {
-   reuse.setField(parsedValues[i], i);
+   if (typeInformation instanceof TupleTypeInfoBase) {
+   // result type is tuple
+   Tuple result = (Tuple) reuse;
+   for (int i = 0; i < parsedValues.length; i++) {
+   result.setField(parsedValues[i], i);
+   }
+   } else {
+   // result type is POJO
+   PojoTypeInfo pojoTypeInfo = 
(PojoTypeInfo) typeInformation;
+   for (int i = 0; i < parsedValues.length; i++) {
+   if (fieldsMap[i] == null) {
--- End diff --

May not be null.


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14356788#comment-14356788
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26206329
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -234,9 +291,29 @@ public OUT readRecord(OUT reuse, byte[] bytes, int 
offset, int numBytes) throws
}

if (parseRecord(parsedValues, bytes, offset, numBytes)) {
-   // valid parse, map values into pact record
-   for (int i = 0; i < parsedValues.length; i++) {
-   reuse.setField(parsedValues[i], i);
+   if (typeInformation instanceof TupleTypeInfoBase) {
+   // result type is tuple
+   Tuple result = (Tuple) reuse;
+   for (int i = 0; i < parsedValues.length; i++) {
+   result.setField(parsedValues[i], i);
+   }
+   } else {
+   // result type is POJO
+   PojoTypeInfo pojoTypeInfo = 
(PojoTypeInfo) typeInformation;
+   for (int i = 0; i < parsedValues.length; i++) {
+   if (fieldsMap[i] == null) {
+   continue;
+   }
+
+   try {
+   int fieldIndex = 
typeInformation.getFieldIndex(fieldsMap[i]);
--- End diff --

We could compute the index upfront and avoid the String look-up.


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14356783#comment-14356783
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26206133
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -152,6 +177,38 @@ public void setFields(boolean[] sourceFieldMask, 
Class[] fieldTypes) {
public Class[] getFieldTypes() {
return super.getGenericFieldTypes();
}
+
+   public void setFieldsMap(String[] fieldsMap) {
+   Preconditions.checkNotNull(fieldsMap);
+   Preconditions.checkState(typeInformation instanceof 
PojoTypeInfo);
+
+   PojoTypeInfo pojoTypeInfo = (PojoTypeInfo) 
typeInformation;
+
+   String[] fields = pojoTypeInfo.getFieldNames();
+   Class[] fieldTypes = getFieldTypes();
+   this.fieldsMap = Arrays.copyOfRange(fieldsMap, 0, 
fieldsMap.length);
+
+   boolean[] includeMask = new boolean[fieldsMap.length];
+   Class[] newFieldTypes = new Class[fieldsMap.length];
+
+   for (int i = 0; i < fieldsMap.length; i++) {
+   if (fieldsMap[i] == null) {
--- End diff --

IMO, ``null`` values should not be allowed in the ``fieldsMap``. Can you 
throw an exception in that case?
The ``fieldsMap`` should be a list of fields that are mapped to columns in 
the CSV file. As said before, the columns that are read by the format are 
defined by the ``includeMask``.


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14356781#comment-14356781
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26205969
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -152,6 +177,38 @@ public void setFields(boolean[] sourceFieldMask, 
Class[] fieldTypes) {
public Class[] getFieldTypes() {
return super.getGenericFieldTypes();
}
+
+   public void setFieldsMap(String[] fieldsMap) {
+   Preconditions.checkNotNull(fieldsMap);
+   Preconditions.checkState(typeInformation instanceof 
PojoTypeInfo);
+
+   PojoTypeInfo pojoTypeInfo = (PojoTypeInfo) 
typeInformation;
+
+   String[] fields = pojoTypeInfo.getFieldNames();
+   Class[] fieldTypes = getFieldTypes();
+   this.fieldsMap = Arrays.copyOfRange(fieldsMap, 0, 
fieldsMap.length);
+
+   boolean[] includeMask = new boolean[fieldsMap.length];
+   Class[] newFieldTypes = new Class[fieldsMap.length];
+
+   for (int i = 0; i < fieldsMap.length; i++) {
+   if (fieldsMap[i] == null) {
+   includeMask[i] = false;
+   newFieldTypes[i] = null;
+   continue;
+   }
+
+   for (int j = 0; j < fields.length; j++) {
+   if (fields[j].equals(fieldsMap[i])) {
+   includeMask[i] = true;
+   newFieldTypes[i] = fieldTypes[j];
+   break;
+   }
+   }
--- End diff --

Can you throw an exception if the provided field name was not found in the 
POJO type information?


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14356765#comment-14356765
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26205234
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -152,6 +177,38 @@ public void setFields(boolean[] sourceFieldMask, 
Class[] fieldTypes) {
public Class[] getFieldTypes() {
return super.getGenericFieldTypes();
}
+
+   public void setFieldsMap(String[] fieldsMap) {
+   Preconditions.checkNotNull(fieldsMap);
+   Preconditions.checkState(typeInformation instanceof 
PojoTypeInfo);
+
+   PojoTypeInfo pojoTypeInfo = (PojoTypeInfo) 
typeInformation;
+
+   String[] fields = pojoTypeInfo.getFieldNames();
+   Class[] fieldTypes = getFieldTypes();
+   this.fieldsMap = Arrays.copyOfRange(fieldsMap, 0, 
fieldsMap.length);
+
+   boolean[] includeMask = new boolean[fieldsMap.length];
--- End diff --

The ``includeMask`` refers to the fields in the CsvFile and allows to skip 
fields of the file.
For example if a line in your file looks like:
``Sam,Smith,09-15-1963,123.123``and you only want to read the first name 
and the date field, you would set the ``includeMask`` to ``[true, false, 
true]`` (missing fields are treated as ``false``). 
So the ``includeMask`` should not depend on the ``fieldsMap``, but the 
number of ``true`` entries in the ``includeMask`` must be equal to the number 
for fields in the ``fieldsMap``.


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14356725#comment-14356725
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r26203612
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -64,26 +70,45 @@
private transient int commentCount;
 
private transient int invalidLineCount;
+
+   private CompositeType typeInformation = null;
+
+   private String[] fieldsMap = null;

+   public CsvInputFormat(Path filePath, TypeInformation 
typeInformation) {
+   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
typeInformation);
+   }

-   public CsvInputFormat(Path filePath) {
-   super(filePath);
-   }   
-   
-   public CsvInputFormat(Path filePath, Class ... types) {
-   this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER, 
types);
-   }   
-   
-   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, Class... types) {
+   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, TypeInformation typeInformation) {
super(filePath);
 
+   Preconditions.checkArgument(typeInformation instanceof 
CompositeType);
+   this.typeInformation = (CompositeType) typeInformation;
+
setDelimiter(lineDelimiter);
setFieldDelimiter(fieldDelimiter);
 
-   setFieldTypes(types);
+   Class[] classes = new Class[typeInformation.getArity()];
+   for (int i = 0, arity = typeInformation.getArity(); i < arity; 
i++) {
+   classes[i] = 
this.typeInformation.getTypeAt(i).getTypeClass();
+   }
+   setFieldTypes(classes);
+
+   if (typeInformation instanceof PojoTypeInfo) {
+   setFieldsMap(this.typeInformation.getFieldNames());
+   setAccessibleToField();
+   }
}
-   
-   
+
+   public void setAccessibleToField() {
--- End diff --

Can we make this method private?


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14355172#comment-14355172
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-78094247
  
Added following changes.
* String[] parameter for `pojoType` method in `CsvReader`.
* Refactor constructor of `CsvInputFormat`


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14353328#comment-14353328
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-77912526
  
@teabot: We've created this JIRA for the feature you've suggested: 
https://issues.apache.org/jira/browse/FLINK-1665


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14348700#comment-14348700
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-77358774
  
Those are very good points! Having an annotation for POJO fields to define 
their order in the PojoTypeInfo (and therefore the order in which the fields 
are mapped to CSV fields) is a nice idea.

The String[] parameter to select POJO fields would still be good because it
- makes the POJO type and the CSV field independent of each other and
- allows to fill only a subset of POJO fields.

Having a way to provide logic to parse a String into a 
custom/not-natively-supported type is also important for POJOs. I would 
however, make that independent of the POJO and add it as a parameter to the 
CsvReader. That way you don't need to touch existing POJO code and make it 
again independent of the data source (imagine CSV files with different date 
formats).


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14348616#comment-14348616
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user teabot commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-77347164
  
I like that this request adds what I would consider to be a very useful 
feature. Data is often stored in text delimited formats and one of the nice 
features of Flink is the ability to work with POJOs. Therefore some mechanism 
to map from delimited files to POJOs is useful.

However, I have found that such mappings are rarely restricted to the 
simple cases. Therefore might I suggest that some other factors be considered 
as part of this request so as not to limit the usefulness of and to provide 
some extensibility with this API change:

* The position fields within the delimited file and how they are mapped to 
POJO properties could indeed be modelled with a `String[]` as suggested. 
However, this will be arduous to maintain and prone to human error for types 
with a large number of fields. An alternative could be to have a `@Position` 
annotation on the POJO object fields to indicate the CSV column index.
```
  public class MyPojo {
@Position(0)
public int id;

@Position(1)
public String name;
...
```
* Flink + POJOs bring the benefit of richer types to data processing 
pipelines. But it seems to me that this implementation imposes a restriction on 
the range of types that can be used within the target POJO type negating this 
benefit somewhat. If I want to use a POJO with a `DateTime` field then I must 
still create my own mapping function to do so. Therefore It would be useful to 
provide some hook into the CSV-POJO mapping process to allow the specification 
of user declared/defined type converters. This can then enable users to easily 
map to field types of their choosing. Again, this could be modelled as an 
annotation:
```
  public class MyPojo {
// DateTimeConverter implemented by the user
@Converter(type=DateTimeConverter.class, 
properties="format=/MM/dd:HH;timezone=Europe/London")
public DateTime id;
...
```
You might consider that these additional features are better served by some 
separate type-mapping component or API (and I'd probably agree). But in that 
case is it then wise to also add a simpler, less flexible form to the core 
Flink API?

Thank you for your time.


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14348370#comment-14348370
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r25846058
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -64,15 +66,25 @@
private transient int commentCount;
 
private transient int invalidLineCount;
+
+   private PojoTypeInfo pojoTypeInfo = null;


public CsvInputFormat(Path filePath) {
super(filePath);
-   }   
+   }
+
+   public CsvInputFormat(Path filePath, PojoTypeInfo pojoTypeInfo) {
+   super(filePath);
+
+   Preconditions.checkNotNull(pojoTypeInfo, "The TypeInformation 
is required for getting the POJO fields.");
+   this.pojoTypeInfo = pojoTypeInfo;
+   setAccessibleToField();
+   }

public CsvInputFormat(Path filePath, Class ... types) {
--- End diff --

Yes, if we go for the more general TypeInformation we need to check whether 
it is a TupleTypeInfo or PojoTypeInfo and can get rid of one pair of 
constructors. Actually, I think both solutions are fine. 


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14347984#comment-14347984
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-77293135
  
@fhueske Oh, you are right. Currently, users cannot decide order of fields. 
I will add a parameter to set order of fields.


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14347978#comment-14347978
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r25834522
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -64,15 +66,25 @@
private transient int commentCount;
 
private transient int invalidLineCount;
+
+   private PojoTypeInfo pojoTypeInfo = null;


public CsvInputFormat(Path filePath) {
super(filePath);
-   }   
+   }
+
+   public CsvInputFormat(Path filePath, PojoTypeInfo pojoTypeInfo) {
+   super(filePath);
+
+   Preconditions.checkNotNull(pojoTypeInfo, "The TypeInformation 
is required for getting the POJO fields.");
+   this.pojoTypeInfo = pojoTypeInfo;
+   setAccessibleToField();
+   }

public CsvInputFormat(Path filePath, Class ... types) {
--- End diff --

@fhueske I think that we can change the constructors to receive 
TypeInformation instead of a Class[]. How about this?


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14347766#comment-14347766
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r25825316
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -64,15 +66,25 @@
private transient int commentCount;
 
private transient int invalidLineCount;
+
+   private PojoTypeInfo pojoTypeInfo = null;


public CsvInputFormat(Path filePath) {
super(filePath);
-   }   
+   }
+
+   public CsvInputFormat(Path filePath, PojoTypeInfo pojoTypeInfo) {
+   super(filePath);
+
+   Preconditions.checkNotNull(pojoTypeInfo, "The TypeInformation 
is required for getting the POJO fields.");
+   this.pojoTypeInfo = pojoTypeInfo;
+   setAccessibleToField();
+   }

public CsvInputFormat(Path filePath, Class ... types) {
--- End diff --

Can we "standardize" the constructors and pass a TupleTypeInfo for the 
Tuple case instead of a `Class[]`?


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14347757#comment-14347757
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/426#issuecomment-77272091
  
How are fields in the CSV file mapped to POJO fields? I assume it is the 
order of fields in the POJO type information, right? Is that order the same as 
in the POJO definition or some other such as alphanumeric ordering of the field 
name? This might not be obvious for users.
Would it make sense to add a `String[]` to map POJO fields to field 
positions in the CSV file, i.e., `String[] {"name", "age", "zip"}` would map 
the POJO field `name` to the first CSV field, and so on.


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-03-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14346626#comment-14346626
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r25760453
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -235,8 +252,21 @@ public OUT readRecord(OUT reuse, byte[] bytes, int 
offset, int numBytes) throws

if (parseRecord(parsedValues, bytes, offset, numBytes)) {
// valid parse, map values into pact record
-   for (int i = 0; i < parsedValues.length; i++) {
-   reuse.setField(parsedValues[i], i);
+   if (pojoTypeInfo == null) {
+   Tuple result = (Tuple) reuse;
+   for (int i = 0; i < parsedValues.length; i++) {
+   result.setField(parsedValues[i], i);
+   }
+   } else {
+   for (int i = 0; i < parsedValues.length; i++) {
+   try {
+   
pojoTypeInfo.getPojoFieldAt(i).field.set(reuse, parsedValues[i]);
--- End diff --

@rmetzger Thanks! I modify my implementation to set the fields accessible 
in `CsvInputFormat` and `ScalaCsvInputFormat` and add a test case with private 
fields.


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14334707#comment-14334707
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r25237861
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -235,8 +252,21 @@ public OUT readRecord(OUT reuse, byte[] bytes, int 
offset, int numBytes) throws

if (parseRecord(parsedValues, bytes, offset, numBytes)) {
// valid parse, map values into pact record
-   for (int i = 0; i < parsedValues.length; i++) {
-   reuse.setField(parsedValues[i], i);
+   if (pojoTypeInfo == null) {
+   Tuple result = (Tuple) reuse;
+   for (int i = 0; i < parsedValues.length; i++) {
+   result.setField(parsedValues[i], i);
+   }
+   } else {
+   for (int i = 0; i < parsedValues.length; i++) {
+   try {
+   
pojoTypeInfo.getPojoFieldAt(i).field.set(reuse, parsedValues[i]);
--- End diff --

I'm not 100% sure if this also works for pojo's with private fields.
In your testcase, all the fields are public. I'm not sure where exactly we 
are setting the fields accessible, but it might be the case that they are not 
set accessible here. 
Can you add a private field with a getter/setter to the test pojo to 
validate this?


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14334701#comment-14334701
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r25237152
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -82,6 +92,13 @@ public CsvInputFormat(Path filePath, String 
lineDelimiter, String fieldDelimiter
 
setFieldTypes(types);
}
+
+   public CsvInputFormat(Path filePath, String lineDelimiter, String 
fieldDelimiter, PojoTypeInfo pojoTypeInfo) {
+   this(filePath, lineDelimiter, fieldDelimiter);
+
+   Preconditions.checkNotNull(pojoTypeInfo, "Type information must 
be required to set values to pojo");
--- End diff --

same here.


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-02-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14334700#comment-14334700
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/426#discussion_r25237140
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java ---
@@ -64,15 +65,24 @@
private transient int commentCount;
 
private transient int invalidLineCount;
+
+   private PojoTypeInfo pojoTypeInfo = null;


public CsvInputFormat(Path filePath) {
super(filePath);
-   }   
+   }
+
+   public CsvInputFormat(Path filePath, PojoTypeInfo pojoTypeInfo) {
+   super(filePath);
+
+   Preconditions.checkNotNull(pojoTypeInfo, "Type information must 
be required to set values to pojo");
--- End diff --

The error message reads a bit weird. How about "The TypeInformation is 
required for getting the POJO fields."


> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-02-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14327776#comment-14327776
 ] 

ASF GitHub Bot commented on FLINK-1512:
---

GitHub user chiwanpark opened a pull request:

https://github.com/apache/flink/pull/426

[FLINK-1512] Add CsvReader for reading into POJOs.

This PR contains following changes.

* `CsvInputFormat` and `ScalaCsvInputFormat` can receive POJO type as 
generic parameter
* Add `pojoType(Class targetType)` into `CsvReader` (Java API)
* Modify `readCsvFile` method in `ExecutionEnvironment` (Scala API)
* Add unit tests for `CsvInputFormat` and `ScalaCsvInputFormat`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chiwanpark/flink FLINK-1512

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/426.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #426


commit 2463d6b7d244528e1625288e1b780335769f14ee
Author: Chiwan Park 
Date:   2015-02-18T18:27:59Z

[FLINK-1512] [java api] Add CsvReader for reading into POJOs

commit 8fe5f8d1bd402382e6fa93014c5b2fec8e22cbd0
Author: Chiwan Park 
Date:   2015-02-19T17:23:56Z

[FLINK-1512] [scala api] Add CsvReader for reading into POJOs




> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-02-12 Thread Chiwan Park (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14319492#comment-14319492
 ] 

Chiwan Park commented on FLINK-1512:


[~StephanEwen] Thanks for your explanation. I'll implement this feature and 
send a pull request.

> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-02-12 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14317945#comment-14317945
 ] 

Stephan Ewen commented on FLINK-1512:
-

[~chiwanpark] Here is the difference:
 - The {{types}} method lets you supply a list of primitive types and will 
construct an n-tuple
 - The {{tupleType}} method allows you to use a class that is a subclass of the 
n-Tuple, and it will derive the field types from that class, and return 
instances of that class and put the fields into it. Something like {{class 
MyType extends Tuple3 ...}}

> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1512) Add CsvReader for reading into POJOs.

2015-02-11 Thread Chiwan Park (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14317644#comment-14317644
 ] 

Chiwan Park commented on FLINK-1512:


I'm trying to implement this feature. I can't understand that what is 
difference between {{tupleType}} method and {{types}} method in {{CsvReader}} 
class. I think that two methods are doing same. Why are two methods in class?

I propose a method {{targetType(Class typeClass)}} (The name of method can 
be changed.). Users can use this method like following code.

{code:java}
DataSource items = env.readCsvFile("file-path")
.fieldDelimiter(" ")
.lineDelimiter("\n")
.targetType(Item.class);
{code}

> Add CsvReader for reading into POJOs.
> -
>
> Key: FLINK-1512
> URL: https://issues.apache.org/jira/browse/FLINK-1512
> Project: Flink
>  Issue Type: New Feature
>  Components: Java API, Scala API
>Reporter: Robert Metzger
>Assignee: Chiwan Park
>Priority: Minor
>  Labels: starter
>
> Currently, the {{CsvReader}} supports only TupleXX types. 
> It would be nice if users were also able to read into POJOs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)