lirui-apache commented on a change in pull request #16778:
URL: https://github.com/apache/flink/pull/16778#discussion_r688365486
##########
File path:
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java
##########
@@ -169,10 +176,15 @@ private MessageType clipParquetSchema(GroupType
parquetSchema) {
if (isCaseSensitive) {
for (int i = 0; i < projectedFields.length; ++i) {
String fieldName = projectedFields[i];
- if (parquetSchema.getFieldIndex(fieldName) < 0) {
- throw new IllegalArgumentException(fieldName + " does not
exist");
+ if (!parquetSchema.containsField(fieldName)) {
+ LOG.warn("%s does not exist in %s", fieldName,
parquetSchema);
Review comment:
I don't think slf4j supports this string format. Should use `{}` as the
formatting anchor? And it's better to clarify in the log that we'll return
`null` for such fields.
##########
File path:
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java
##########
@@ -192,7 +204,11 @@ private MessageType clipParquetSchema(GroupType
parquetSchema) {
Type type =
caseInsensitiveFieldMap.get(projectedFields[i].toLowerCase(Locale.ROOT));
if (type == null) {
- throw new IllegalArgumentException(projectedFields[i] + "
does not exist");
+ LOG.warn("%s does not exist in %s", projectedFields[i],
parquetSchema);
Review comment:
ditto
##########
File path:
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java
##########
@@ -253,6 +253,47 @@ public void testProjection() throws IOException {
});
}
+ @Test
+ public void testProjectionReadUnknownField() throws IOException {
+ int number = 1000;
+ List<Row> records = new ArrayList<>(number);
+ for (int i = 0; i < number; i++) {
+ Integer v = i;
+ records.add(newRow(v));
+ }
+
+ Path testPath =
+ createTempParquetFile(
+ TEMPORARY_FOLDER.newFolder(), PARQUET_SCHEMA, records,
rowGroupSize);
+
+ // test reader
+ LogicalType[] fieldTypes =
+ new LogicalType[] {
+ new DoubleType(), new TinyIntType(), new IntType(), new
VarCharType()
+ };
+ ParquetColumnarRowInputFormat<FileSourceSplit> format =
+ new ParquetColumnarRowInputFormat(
+ new Configuration(),
+ // f99 not exist in parquet file.
+ RowType.of(fieldTypes, new String[] {"f7", "f2", "f4",
"f99"}),
+ 500,
+ false,
+ true);
+
+ AtomicInteger cnt = new AtomicInteger(0);
+ forEachRemaining(
+ format.createReader(
+ EMPTY_CONF, new FileSourceSplit("id", testPath, 0,
Long.MAX_VALUE)),
+ row -> {
+ int i = cnt.get();
+ assertEquals(i, row.getDouble(0), 0);
+ assertEquals((byte) i, row.getByte(1));
+ assertEquals(i, row.getInt(2));
+ assertEquals(true, row.isNullAt(3));
Review comment:
```suggestion
assertTrue(row.isNullAt(3));
```
##########
File path:
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java
##########
@@ -361,8 +377,11 @@ private boolean nextBatch(ParquetReaderBatch<T> batch)
throws IOException {
int num = (int) Math.min(batchSize, totalCountLoadedSoFar -
rowsReturned);
for (int i = 0; i < columnReaders.length; ++i) {
- //noinspection unchecked
Review comment:
Why do we remove this line?
##########
File path:
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java
##########
@@ -253,6 +253,47 @@ public void testProjection() throws IOException {
});
}
+ @Test
+ public void testProjectionReadUnknownField() throws IOException {
+ int number = 1000;
+ List<Row> records = new ArrayList<>(number);
+ for (int i = 0; i < number; i++) {
+ Integer v = i;
+ records.add(newRow(v));
+ }
+
+ Path testPath =
+ createTempParquetFile(
+ TEMPORARY_FOLDER.newFolder(), PARQUET_SCHEMA, records,
rowGroupSize);
+
+ // test reader
+ LogicalType[] fieldTypes =
+ new LogicalType[] {
+ new DoubleType(), new TinyIntType(), new IntType(), new
VarCharType()
+ };
+ ParquetColumnarRowInputFormat<FileSourceSplit> format =
+ new ParquetColumnarRowInputFormat(
Review comment:
```suggestion
new ParquetColumnarRowInputFormat<>(
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]