This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 011c072196 Enabled the method 'next(GenericRow row)' in the 
CSVRecordReader while using the line iterator (#11581)
011c072196 is described below

commit 011c072196bc7959155a34f4ae9877abd676669f
Author: Ragesh Rajagopalan <[email protected]>
AuthorDate: Wed Sep 13 08:38:38 2023 -0700

    Enabled the method 'next(GenericRow row)' in the CSVRecordReader while 
using the line iterator (#11581)
    
    * Enabled the method 'next(GenericRow row)' in the CSVRecordReader while 
using the line iterator
    
    * trigger build
---
 .../plugin/inputformat/csv/CSVRecordReader.java    |  3 +-
 .../inputformat/csv/CSVRecordReaderTest.java       | 74 ++++++++++++----------
 2 files changed, 41 insertions(+), 36 deletions(-)

diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
index 78423e408f..49339ac218 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
@@ -225,8 +225,7 @@ public class CSVRecordReader implements RecordReader {
   public GenericRow next(GenericRow reuse)
       throws IOException {
     if (_useLineIterator) {
-      throw new UnsupportedOperationException("Method signature 
'next(GenericRow genericRow)'not supported while using "
-          + "the config option 'skipUnParseableLines'.");
+      reuse.init(_nextRecord);
     } else {
       CSVRecord record = _iterator.next();
       _recordExtractor.extract(record, reuse);
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
index b7012a566c..e174ced7e5 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
@@ -230,12 +230,12 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
     readerConfig.setSkipUnParseableLines(true);
     readerConfig.setCommentMarker('#');
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
     Assert.assertEquals(3, genericRows.size());
 
     // test using default CSVRecordReader
     readerConfig.setSkipUnParseableLines(false);
-    genericRows = readCSVRecords(dataFile, readerConfig, false);
+    genericRows = readCSVRecords(dataFile, readerConfig, null, false);
     Assert.assertEquals(3, genericRows.size());
   }
 
@@ -248,12 +248,12 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     // test using line iterator
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
     readerConfig.setSkipUnParseableLines(true);
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
     Assert.assertEquals(5, genericRows.size());
 
     // test using default CSVRecordReader
     readerConfig.setSkipUnParseableLines(false);
-    genericRows = readCSVRecords(dataFile, readerConfig, false);
+    genericRows = readCSVRecords(dataFile, readerConfig, null, false);
     Assert.assertEquals(5, genericRows.size());
   }
 
@@ -266,12 +266,12 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     // test using line iterator
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
     readerConfig.setSkipUnParseableLines(true);
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
     Assert.assertEquals(2, genericRows.size());
 
     // test using default CSVRecordReader
     readerConfig.setSkipUnParseableLines(false);
-    genericRows = readCSVRecords(dataFile, readerConfig, false);
+    genericRows = readCSVRecords(dataFile, readerConfig, null, false);
     Assert.assertEquals(2, genericRows.size());
   }
 
@@ -285,12 +285,12 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
     readerConfig.setSkipUnParseableLines(true);
     readerConfig.setHeader("id,name");
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
     Assert.assertEquals(3, genericRows.size());
 
     // test using default CSVRecordReader
     readerConfig.setSkipUnParseableLines(false);
-    genericRows = readCSVRecords(dataFile, readerConfig, false);
+    genericRows = readCSVRecords(dataFile, readerConfig, null, false);
     Assert.assertEquals(3, genericRows.size());
   }
 
@@ -303,12 +303,12 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     // test using line iterator
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
     readerConfig.setSkipUnParseableLines(true);
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
     Assert.assertEquals(2, genericRows.size());
 
     // test using default CSVRecordReader
     readerConfig.setSkipUnParseableLines(false);
-    genericRows = readCSVRecords(dataFile, readerConfig, false);
+    genericRows = readCSVRecords(dataFile, readerConfig, null, false);
     Assert.assertEquals(2, genericRows.size());
   }
 
@@ -320,7 +320,7 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
 
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
     readerConfig.setSkipUnParseableLines(true);
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
     Assert.assertEquals(1, genericRows.size());
   }
 
@@ -331,7 +331,7 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     File dataFile = new File(uri);
 
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
-    readCSVRecords(dataFile, readerConfig, false);
+    readCSVRecords(dataFile, readerConfig, null, false);
   }
 
   @Test
@@ -345,7 +345,7 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     readerConfig.setCommentMarker('#');
     readerConfig.setIgnoreEmptyLines(true);
 
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
     Assert.assertEquals(7, genericRows.size());
   }
 
@@ -359,7 +359,7 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     readerConfig.setCommentMarker('#');
     readerConfig.setIgnoreEmptyLines(true);
 
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, new 
GenericRow(), false);
     Assert.assertEquals(7, genericRows.size());
   }
 
@@ -373,10 +373,10 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     readerConfig.setSkipUnParseableLines(true);
     readerConfig.setCommentMarker('#');
     readerConfig.setIgnoreEmptyLines(true);
-    readCSVRecords(dataFile, readerConfig, true);
+    readCSVRecords(dataFile, readerConfig, null, true);
 
     // Start reading again; results should be same
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, new 
GenericRow(), false);
     Assert.assertEquals(7, genericRows.size());
   }
 
@@ -389,10 +389,10 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
     readerConfig.setCommentMarker('#');
     readerConfig.setIgnoreEmptyLines(true);
-    readCSVRecords(dataFile, readerConfig, true);
+    readCSVRecords(dataFile, readerConfig, null, true);
 
     // Start reading again; results should be same
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
     Assert.assertEquals(7, genericRows.size());
   }
 
@@ -407,12 +407,12 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     readerConfig.setHeader("firstName,lastName,id");
     readerConfig.setSkipHeader(true);
     readerConfig.setSkipUnParseableLines(true);
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
     Assert.assertEquals(3, genericRows.size());
 
     // test using default CSVRecordReader
     readerConfig.setSkipUnParseableLines(false);
-    genericRows = readCSVRecords(dataFile, readerConfig, false);
+    genericRows = readCSVRecords(dataFile, readerConfig, null, false);
     Assert.assertEquals(3, genericRows.size());
   }
 
@@ -426,12 +426,12 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
     readerConfig.setDelimiter('|');
     readerConfig.setSkipUnParseableLines(true);
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
     Assert.assertEquals(3, genericRows.size());
 
     // test using default CSVRecordReader
     readerConfig.setSkipUnParseableLines(false);
-    genericRows = readCSVRecords(dataFile, readerConfig, false);
+    genericRows = readCSVRecords(dataFile, readerConfig, null, false);
     Assert.assertEquals(3, genericRows.size());
   }
 
@@ -445,13 +445,13 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
     readerConfig.setSkipUnParseableLines(true);
     readerConfig.setIgnoreSurroundingSpaces(true);
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
     Assert.assertEquals(3, genericRows.size());
     validateSpaceAroundHeadersAreTrimmed(dataFile, readerConfig);
 
     // test using default CSVRecordReader
     readerConfig.setSkipUnParseableLines(false);
-    genericRows = readCSVRecords(dataFile, readerConfig, false);
+    genericRows = readCSVRecords(dataFile, readerConfig, null, false);
     Assert.assertEquals(3, genericRows.size());
     validateSpaceAroundHeadersAreTrimmed(dataFile, readerConfig);
   }
@@ -466,13 +466,13 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
     readerConfig.setSkipUnParseableLines(true);
     readerConfig.setIgnoreSurroundingSpaces(false);
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
     Assert.assertEquals(3, genericRows.size());
     validateSpaceAroundHeadersAreRetained(dataFile, readerConfig);
 
     // test using default CSVRecordReader
     readerConfig.setSkipUnParseableLines(false);
-    genericRows = readCSVRecords(dataFile, readerConfig, false);
+    genericRows = readCSVRecords(dataFile, readerConfig, null, false);
     Assert.assertEquals(3, genericRows.size());
     validateSpaceAroundHeadersAreRetained(dataFile, readerConfig);
   }
@@ -487,18 +487,18 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     readerConfig.setSkipUnParseableLines(true);
     readerConfig.setHeader("id,name");
     readerConfig.setSkipHeader(true);
-    readCSVRecords(dataFile, readerConfig, true);
+    readCSVRecords(dataFile, readerConfig, new GenericRow(), true);
 
     // Start reading again; results should be same
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
     Assert.assertEquals(3, genericRows.size());
 
     // test using default CSVRecordReader
     readerConfig.setSkipUnParseableLines(false);
-    readCSVRecords(dataFile, readerConfig, true);
+    readCSVRecords(dataFile, readerConfig, null, true);
 
     // Start reading again; results should be same
-    genericRows = readCSVRecords(dataFile, readerConfig, false);
+    genericRows = readCSVRecords(dataFile, readerConfig, null, false);
     Assert.assertEquals(3, genericRows.size());
   }
 
@@ -511,7 +511,7 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     // test using line iterator
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
     readerConfig.setSkipUnParseableLines(true);
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
     Assert.assertEquals(2, genericRows.size());
 
     // Note: The default CSVRecordReader cannot handle unparseable rows
@@ -526,20 +526,26 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     // test using line iterator
     CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
     readerConfig.setSkipUnParseableLines(true);
-    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
false);
+    List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, 
null, false);
     Assert.assertEquals(0, genericRows.size());
 
     // Note: The default CSVRecordReader cannot handle unparseable rows
   }
 
-  private List<GenericRow> readCSVRecords(File dataFile, CSVRecordReaderConfig 
readerConfig, boolean rewind)
+  private List<GenericRow> readCSVRecords(File dataFile,
+      CSVRecordReaderConfig readerConfig, GenericRow genericRow, boolean 
rewind)
       throws IOException {
     List<GenericRow> genericRows = new ArrayList<>();
 
     try (CSVRecordReader recordReader = new CSVRecordReader()) {
       recordReader.init(dataFile, null, readerConfig);
+      GenericRow reuse = new GenericRow();
       while (recordReader.hasNext()) {
-        GenericRow genericRow = recordReader.next();
+        if (genericRow != null) {
+          recordReader.next(reuse);
+        } else {
+          recordReader.next();
+        }
         genericRows.add(genericRow);
       }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to