Repository: flink
Updated Branches:
  refs/heads/master 033c69f94 -> 1b42b6206


[FLINK-1512] [tests] Add integration tests for CsvReader

This closes #426


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/43ac967a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/43ac967a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/43ac967a

Branch: refs/heads/master
Commit: 43ac967acb589790f5b3befd6f932e325d4ba681
Parents: 7a6f296
Author: Chiwan Park <chiwanp...@icloud.com>
Authored: Wed Mar 25 15:22:43 2015 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Wed Mar 25 20:38:59 2015 +0100

----------------------------------------------------------------------
 .../flink/api/scala/io/CsvInputFormatTest.scala | 68 +++++++++-----------
 1 file changed, 32 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/43ac967a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
index 4bcd35a..0d74515 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CsvInputFormatTest.scala
@@ -413,6 +413,31 @@ class CsvInputFormatTest {
 
   case class CaseClassItem(field1: Int, field2: String, field3: Double)
 
+  private def validatePOJOItem(format: ScalaCsvInputFormat[POJOItem]): Unit = {
+    var result = new POJOItem()
+    result = format.nextRecord(result)
+    assertEquals(123, result.field1)
+    assertEquals("HELLO", result.field2)
+    assertEquals(3.123, result.field3, 0.001)
+
+    result = format.nextRecord(result)
+    assertEquals(456, result.field1)
+    assertEquals("ABC", result.field2)
+    assertEquals(1.234, result.field3, 0.001)
+  }
+
+  private def validateCaseClassItem(format: 
ScalaCsvInputFormat[CaseClassItem]): Unit = {
+    var result = format.nextRecord(null)
+    assertEquals(123, result.field1)
+    assertEquals("HELLO", result.field2)
+    assertEquals(3.123, result.field3, 0.001)
+
+    result = format.nextRecord(null)
+    assertEquals(456, result.field1)
+    assertEquals("ABC", result.field2)
+    assertEquals(1.234, result.field3, 0.001)
+  }
+
   @Test
   def testPOJOType(): Unit = {
     val fileContent = "123,HELLO,3.123\n" + "456,ABC,1.234"
@@ -425,16 +450,7 @@ class CsvInputFormatTest {
     format.configure(new Configuration)
     format.open(tempFile)
 
-    var result = new POJOItem()
-    result = format.nextRecord(result)
-    assertEquals(123, result.field1)
-    assertEquals("HELLO", result.field2)
-    assertEquals(3.123, result.field3, 0.001)
-
-    result = format.nextRecord(result)
-    assertEquals(456, result.field1)
-    assertEquals("ABC", result.field2)
-    assertEquals(1.234, result.field3, 0.001)
+    validatePOJOItem(format)
   }
 
   @Test
@@ -449,15 +465,7 @@ class CsvInputFormatTest {
     format.configure(new Configuration)
     format.open(tempFile)
 
-    var result = format.nextRecord(null)
-    assertEquals(123, result.field1)
-    assertEquals("HELLO", result.field2)
-    assertEquals(3.123, result.field3, 0.001)
-
-    result = format.nextRecord(null)
-    assertEquals(456, result.field1)
-    assertEquals("ABC", result.field2)
-    assertEquals(1.234, result.field3, 0.001)
+    validateCaseClassItem(format)
   }
 
   @Test
@@ -474,36 +482,24 @@ class CsvInputFormatTest {
     format.configure(new Configuration)
     format.open(tempFile)
 
-    var result = new POJOItem()
-    result = format.nextRecord(result)
-    assertEquals(123, result.field1)
-    assertEquals("HELLO", result.field2)
-    assertEquals(3.123, result.field3, 0.001)
-
-    result = format.nextRecord(result)
-    assertEquals(456, result.field1)
-    assertEquals("ABC", result.field2)
-    assertEquals(1.234, result.field3, 0.001)
+    validatePOJOItem(format)
   }
   
   @Test
   def testPOJOTypeWithFieldSubsetAndDataSubset(): Unit = {
-    val fileContent = "123,HELLO,3.123\n" + "456,ABC,1.234"
+    val fileContent = "HELLO,123,NODATA,3.123,NODATA\n" + 
"ABC,456,NODATA,1.234,NODATA"
     val tempFile = createTempFile(fileContent)
     val typeInfo: TypeInformation[POJOItem] = createTypeInformation[POJOItem]
     val format = new ScalaCsvInputFormat[POJOItem](PATH, typeInfo)
 
     format.setDelimiter('\n')
     format.setFieldDelimiter(',')
-    format.setFields(Array(false, true), Array(classOf[String]): 
Array[Class[_]])
+    format.setFields(Array(true, true, false, true, false),
+      Array(classOf[String], classOf[Integer], classOf[java.lang.Double]): 
Array[Class[_]])
     format.setOrderOfPOJOFields(Array("field2", "field1", "field3"))
     format.configure(new Configuration)
     format.open(tempFile)
 
-    var result = format.nextRecord(new POJOItem())
-    assertEquals("HELLO", result.field2)
-
-    result = format.nextRecord(result)
-    assertEquals("ABC", result.field2)
+    validatePOJOItem(format)
   }
 }

Reply via email to