lirui-apache commented on a change in pull request #16778:
URL: https://github.com/apache/flink/pull/16778#discussion_r688365486



##########
File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java
##########
@@ -169,10 +176,15 @@ private MessageType clipParquetSchema(GroupType 
parquetSchema) {
         if (isCaseSensitive) {
             for (int i = 0; i < projectedFields.length; ++i) {
                 String fieldName = projectedFields[i];
-                if (parquetSchema.getFieldIndex(fieldName) < 0) {
-                    throw new IllegalArgumentException(fieldName + " does not 
exist");
+                if (!parquetSchema.containsField(fieldName)) {
+                    LOG.warn("%s does not exist in %s", fieldName, 
parquetSchema);

Review comment:
       I don't think slf4j supports this string format. Should use `{}` as the 
formatting anchor? And it's better to clarify in the log that we'll return 
`null` for such fields.

##########
File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java
##########
@@ -192,7 +204,11 @@ private MessageType clipParquetSchema(GroupType 
parquetSchema) {
                 Type type =
                         
caseInsensitiveFieldMap.get(projectedFields[i].toLowerCase(Locale.ROOT));
                 if (type == null) {
-                    throw new IllegalArgumentException(projectedFields[i] + " 
does not exist");
+                    LOG.warn("%s does not exist in %s", projectedFields[i], 
parquetSchema);

Review comment:
       ditto

##########
File path: 
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java
##########
@@ -253,6 +253,47 @@ public void testProjection() throws IOException {
                 });
     }
 
+    @Test
+    public void testProjectionReadUnknownField() throws IOException {
+        int number = 1000;
+        List<Row> records = new ArrayList<>(number);
+        for (int i = 0; i < number; i++) {
+            Integer v = i;
+            records.add(newRow(v));
+        }
+
+        Path testPath =
+                createTempParquetFile(
+                        TEMPORARY_FOLDER.newFolder(), PARQUET_SCHEMA, records, 
rowGroupSize);
+
+        // test reader
+        LogicalType[] fieldTypes =
+                new LogicalType[] {
+                    new DoubleType(), new TinyIntType(), new IntType(), new 
VarCharType()
+                };
+        ParquetColumnarRowInputFormat<FileSourceSplit> format =
+                new ParquetColumnarRowInputFormat(
+                        new Configuration(),
+                        // f99 not exist in parquet file.
+                        RowType.of(fieldTypes, new String[] {"f7", "f2", "f4", 
"f99"}),
+                        500,
+                        false,
+                        true);
+
+        AtomicInteger cnt = new AtomicInteger(0);
+        forEachRemaining(
+                format.createReader(
+                        EMPTY_CONF, new FileSourceSplit("id", testPath, 0, 
Long.MAX_VALUE)),
+                row -> {
+                    int i = cnt.get();
+                    assertEquals(i, row.getDouble(0), 0);
+                    assertEquals((byte) i, row.getByte(1));
+                    assertEquals(i, row.getInt(2));
+                    assertEquals(true, row.isNullAt(3));

Review comment:
       ```suggestion
                       assertTrue(row.isNullAt(3));
   ```

##########
File path: 
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetVectorizedInputFormat.java
##########
@@ -361,8 +377,11 @@ private boolean nextBatch(ParquetReaderBatch<T> batch) 
throws IOException {
 
             int num = (int) Math.min(batchSize, totalCountLoadedSoFar - 
rowsReturned);
             for (int i = 0; i < columnReaders.length; ++i) {
-                //noinspection unchecked

Review comment:
       Why do we remove this line?

##########
File path: 
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java
##########
@@ -253,6 +253,47 @@ public void testProjection() throws IOException {
                 });
     }
 
+    @Test
+    public void testProjectionReadUnknownField() throws IOException {
+        int number = 1000;
+        List<Row> records = new ArrayList<>(number);
+        for (int i = 0; i < number; i++) {
+            Integer v = i;
+            records.add(newRow(v));
+        }
+
+        Path testPath =
+                createTempParquetFile(
+                        TEMPORARY_FOLDER.newFolder(), PARQUET_SCHEMA, records, 
rowGroupSize);
+
+        // test reader
+        LogicalType[] fieldTypes =
+                new LogicalType[] {
+                    new DoubleType(), new TinyIntType(), new IntType(), new 
VarCharType()
+                };
+        ParquetColumnarRowInputFormat<FileSourceSplit> format =
+                new ParquetColumnarRowInputFormat(

Review comment:
       ```suggestion
                   new ParquetColumnarRowInputFormat<>(
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to