This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 011c072196 Enabled the method 'next(GenericRow row)' in the
CSVRecordReader while using the line iterator (#11581)
011c072196 is described below
commit 011c072196bc7959155a34f4ae9877abd676669f
Author: Ragesh Rajagopalan <[email protected]>
AuthorDate: Wed Sep 13 08:38:38 2023 -0700
Enabled the method 'next(GenericRow row)' in the CSVRecordReader while
using the line iterator (#11581)
* Enabled the method 'next(GenericRow row)' in the CSVRecordReader while
using the line iterator
* trigger build
---
.../plugin/inputformat/csv/CSVRecordReader.java | 3 +-
.../inputformat/csv/CSVRecordReaderTest.java | 74 ++++++++++++----------
2 files changed, 41 insertions(+), 36 deletions(-)
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
index 78423e408f..49339ac218 100644
---
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
@@ -225,8 +225,7 @@ public class CSVRecordReader implements RecordReader {
public GenericRow next(GenericRow reuse)
throws IOException {
if (_useLineIterator) {
- throw new UnsupportedOperationException("Method signature
'next(GenericRow genericRow)'not supported while using "
- + "the config option 'skipUnParseableLines'.");
+ reuse.init(_nextRecord);
} else {
CSVRecord record = _iterator.next();
_recordExtractor.extract(record, reuse);
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
index b7012a566c..e174ced7e5 100644
---
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
@@ -230,12 +230,12 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
readerConfig.setSkipUnParseableLines(true);
readerConfig.setCommentMarker('#');
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
Assert.assertEquals(3, genericRows.size());
// test using default CSVRecordReader
readerConfig.setSkipUnParseableLines(false);
- genericRows = readCSVRecords(dataFile, readerConfig, false);
+ genericRows = readCSVRecords(dataFile, readerConfig, null, false);
Assert.assertEquals(3, genericRows.size());
}
@@ -248,12 +248,12 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
// test using line iterator
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
readerConfig.setSkipUnParseableLines(true);
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
Assert.assertEquals(5, genericRows.size());
// test using default CSVRecordReader
readerConfig.setSkipUnParseableLines(false);
- genericRows = readCSVRecords(dataFile, readerConfig, false);
+ genericRows = readCSVRecords(dataFile, readerConfig, null, false);
Assert.assertEquals(5, genericRows.size());
}
@@ -266,12 +266,12 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
// test using line iterator
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
readerConfig.setSkipUnParseableLines(true);
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
Assert.assertEquals(2, genericRows.size());
// test using default CSVRecordReader
readerConfig.setSkipUnParseableLines(false);
- genericRows = readCSVRecords(dataFile, readerConfig, false);
+ genericRows = readCSVRecords(dataFile, readerConfig, null, false);
Assert.assertEquals(2, genericRows.size());
}
@@ -285,12 +285,12 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
readerConfig.setSkipUnParseableLines(true);
readerConfig.setHeader("id,name");
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
Assert.assertEquals(3, genericRows.size());
// test using default CSVRecordReader
readerConfig.setSkipUnParseableLines(false);
- genericRows = readCSVRecords(dataFile, readerConfig, false);
+ genericRows = readCSVRecords(dataFile, readerConfig, null, false);
Assert.assertEquals(3, genericRows.size());
}
@@ -303,12 +303,12 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
// test using line iterator
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
readerConfig.setSkipUnParseableLines(true);
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
Assert.assertEquals(2, genericRows.size());
// test using default CSVRecordReader
readerConfig.setSkipUnParseableLines(false);
- genericRows = readCSVRecords(dataFile, readerConfig, false);
+ genericRows = readCSVRecords(dataFile, readerConfig, null, false);
Assert.assertEquals(2, genericRows.size());
}
@@ -320,7 +320,7 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
readerConfig.setSkipUnParseableLines(true);
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
Assert.assertEquals(1, genericRows.size());
}
@@ -331,7 +331,7 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
File dataFile = new File(uri);
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
- readCSVRecords(dataFile, readerConfig, false);
+ readCSVRecords(dataFile, readerConfig, null, false);
}
@Test
@@ -345,7 +345,7 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
readerConfig.setCommentMarker('#');
readerConfig.setIgnoreEmptyLines(true);
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
Assert.assertEquals(7, genericRows.size());
}
@@ -359,7 +359,7 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
readerConfig.setCommentMarker('#');
readerConfig.setIgnoreEmptyLines(true);
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, new
GenericRow(), false);
Assert.assertEquals(7, genericRows.size());
}
@@ -373,10 +373,10 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
readerConfig.setSkipUnParseableLines(true);
readerConfig.setCommentMarker('#');
readerConfig.setIgnoreEmptyLines(true);
- readCSVRecords(dataFile, readerConfig, true);
+ readCSVRecords(dataFile, readerConfig, null, true);
// Start reading again; results should be same
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig, new
GenericRow(), false);
Assert.assertEquals(7, genericRows.size());
}
@@ -389,10 +389,10 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
readerConfig.setCommentMarker('#');
readerConfig.setIgnoreEmptyLines(true);
- readCSVRecords(dataFile, readerConfig, true);
+ readCSVRecords(dataFile, readerConfig, null, true);
// Start reading again; results should be same
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
Assert.assertEquals(7, genericRows.size());
}
@@ -407,12 +407,12 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
readerConfig.setHeader("firstName,lastName,id");
readerConfig.setSkipHeader(true);
readerConfig.setSkipUnParseableLines(true);
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
Assert.assertEquals(3, genericRows.size());
// test using default CSVRecordReader
readerConfig.setSkipUnParseableLines(false);
- genericRows = readCSVRecords(dataFile, readerConfig, false);
+ genericRows = readCSVRecords(dataFile, readerConfig, null, false);
Assert.assertEquals(3, genericRows.size());
}
@@ -426,12 +426,12 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
readerConfig.setDelimiter('|');
readerConfig.setSkipUnParseableLines(true);
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
Assert.assertEquals(3, genericRows.size());
// test using default CSVRecordReader
readerConfig.setSkipUnParseableLines(false);
- genericRows = readCSVRecords(dataFile, readerConfig, false);
+ genericRows = readCSVRecords(dataFile, readerConfig, null, false);
Assert.assertEquals(3, genericRows.size());
}
@@ -445,13 +445,13 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
readerConfig.setSkipUnParseableLines(true);
readerConfig.setIgnoreSurroundingSpaces(true);
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
Assert.assertEquals(3, genericRows.size());
validateSpaceAroundHeadersAreTrimmed(dataFile, readerConfig);
// test using default CSVRecordReader
readerConfig.setSkipUnParseableLines(false);
- genericRows = readCSVRecords(dataFile, readerConfig, false);
+ genericRows = readCSVRecords(dataFile, readerConfig, null, false);
Assert.assertEquals(3, genericRows.size());
validateSpaceAroundHeadersAreTrimmed(dataFile, readerConfig);
}
@@ -466,13 +466,13 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
readerConfig.setSkipUnParseableLines(true);
readerConfig.setIgnoreSurroundingSpaces(false);
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
Assert.assertEquals(3, genericRows.size());
validateSpaceAroundHeadersAreRetained(dataFile, readerConfig);
// test using default CSVRecordReader
readerConfig.setSkipUnParseableLines(false);
- genericRows = readCSVRecords(dataFile, readerConfig, false);
+ genericRows = readCSVRecords(dataFile, readerConfig, null, false);
Assert.assertEquals(3, genericRows.size());
validateSpaceAroundHeadersAreRetained(dataFile, readerConfig);
}
@@ -487,18 +487,18 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
readerConfig.setSkipUnParseableLines(true);
readerConfig.setHeader("id,name");
readerConfig.setSkipHeader(true);
- readCSVRecords(dataFile, readerConfig, true);
+ readCSVRecords(dataFile, readerConfig, new GenericRow(), true);
// Start reading again; results should be same
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
Assert.assertEquals(3, genericRows.size());
// test using default CSVRecordReader
readerConfig.setSkipUnParseableLines(false);
- readCSVRecords(dataFile, readerConfig, true);
+ readCSVRecords(dataFile, readerConfig, null, true);
// Start reading again; results should be same
- genericRows = readCSVRecords(dataFile, readerConfig, false);
+ genericRows = readCSVRecords(dataFile, readerConfig, null, false);
Assert.assertEquals(3, genericRows.size());
}
@@ -511,7 +511,7 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
// test using line iterator
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
readerConfig.setSkipUnParseableLines(true);
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
Assert.assertEquals(2, genericRows.size());
// Note: The default CSVRecordReader cannot handle unparseable rows
@@ -526,20 +526,26 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
// test using line iterator
CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
readerConfig.setSkipUnParseableLines(true);
- List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
false);
+ List<GenericRow> genericRows = readCSVRecords(dataFile, readerConfig,
null, false);
Assert.assertEquals(0, genericRows.size());
// Note: The default CSVRecordReader cannot handle unparseable rows
}
- private List<GenericRow> readCSVRecords(File dataFile, CSVRecordReaderConfig
readerConfig, boolean rewind)
+ private List<GenericRow> readCSVRecords(File dataFile,
+ CSVRecordReaderConfig readerConfig, GenericRow genericRow, boolean
rewind)
throws IOException {
List<GenericRow> genericRows = new ArrayList<>();
try (CSVRecordReader recordReader = new CSVRecordReader()) {
recordReader.init(dataFile, null, readerConfig);
+ GenericRow reuse = new GenericRow();
while (recordReader.hasNext()) {
- GenericRow genericRow = recordReader.next();
+ if (genericRow != null) {
+ recordReader.next(reuse);
+ } else {
+ recordReader.next();
+ }
genericRows.add(genericRow);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]