This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new b6801d5  [FLINK-14953][formats] use table type to build parquet 
FilterPredicate
b6801d5 is described below

commit b6801d50ade388b28d349711fdf75e9f75b562da
Author: hpeter <hpe...@uber.com>
AuthorDate: Mon Nov 18 16:57:40 2019 -0800

    [FLINK-14953][formats] use table type to build parquet FilterPredicate
    
    This closes #10371
    
    (cherry picked from commit b5d958d7bfabbc3ef7e75efdcf61be43ad4044be)
---
 .../flink/formats/parquet/ParquetTableSource.java   | 21 ++++++++++++++++-----
 .../parquet/utils/ParquetSchemaConverter.java       |  4 ++--
 .../formats/parquet/ParquetTableSourceITCase.java   |  2 +-
 .../flink/formats/parquet/utils/TestUtil.java       |  1 +
 4 files changed, 20 insertions(+), 8 deletions(-)

diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java
index 2e3a6cf..a8d88fd 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java
@@ -58,8 +58,11 @@ import 
org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
 import org.apache.parquet.filter2.predicate.Operators.FloatColumn;
 import org.apache.parquet.filter2.predicate.Operators.IntColumn;
 import org.apache.parquet.filter2.predicate.Operators.LongColumn;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.io.InvalidRecordException;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -447,8 +450,16 @@ public class ParquetTableSource
 
        @Nullable
        private Tuple2<Column, Comparable> 
extractColumnAndLiteral(BinaryComparison comp) {
-               TypeInformation<?> typeInfo = getLiteralType(comp);
                String columnName = getColumnName(comp);
+               ColumnPath columnPath = ColumnPath.fromDotString(columnName);
+               TypeInformation<?> typeInfo = null;
+               try {
+                       Type type = parquetSchema.getType(columnPath.toArray());
+                       typeInfo = 
ParquetSchemaConverter.convertParquetTypeToTypeInfo(type);
+               } catch (InvalidRecordException e) {
+                       LOG.error("Pushed predicate on undefined field name {} 
in schema", columnName);
+                       return null;
+               }
 
                // fetch literal and ensure it is comparable
                Object value = getLiteral(comp);
@@ -463,15 +474,15 @@ public class ParquetTableSource
                if (typeInfo == BasicTypeInfo.BYTE_TYPE_INFO ||
                        typeInfo == BasicTypeInfo.SHORT_TYPE_INFO ||
                        typeInfo == BasicTypeInfo.INT_TYPE_INFO) {
-                       return new Tuple2<>(FilterApi.intColumn(columnName), 
(Integer) value);
+                       return new Tuple2<>(FilterApi.intColumn(columnName), 
((Number) value).intValue());
                } else if (typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
-                       return new Tuple2<>(FilterApi.longColumn(columnName), 
(Long) value);
+                       return new Tuple2<>(FilterApi.longColumn(columnName), 
((Number) value).longValue());
                } else if (typeInfo == BasicTypeInfo.FLOAT_TYPE_INFO) {
-                       return new Tuple2<>(FilterApi.floatColumn(columnName), 
(Float) value);
+                       return new Tuple2<>(FilterApi.floatColumn(columnName), 
((Number) value).floatValue());
                } else if (typeInfo == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
                        return new 
Tuple2<>(FilterApi.booleanColumn(columnName), (Boolean) value);
                } else if (typeInfo == BasicTypeInfo.DOUBLE_TYPE_INFO) {
-                       return new Tuple2<>(FilterApi.doubleColumn(columnName), 
(Double) value);
+                       return new Tuple2<>(FilterApi.doubleColumn(columnName), 
((Number) value).doubleValue());
                } else if (typeInfo == BasicTypeInfo.STRING_TYPE_INFO) {
                        return new Tuple2<>(FilterApi.binaryColumn(columnName), 
Binary.fromString((String) value));
                } else {
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
index 084c060..326931d 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
@@ -71,7 +71,7 @@ public class ParquetSchemaConverter {
                return (MessageType) convertField(null, typeInformation, 
Type.Repetition.OPTIONAL, legacyMode);
        }
 
-       private static TypeInformation<?> convertFields(List<Type> 
parquetFields) {
+       public static TypeInformation<?> convertFields(List<Type> 
parquetFields) {
                List<TypeInformation<?>> types = new ArrayList<>();
                List<String> names = new ArrayList<>();
                for (Type field : parquetFields) {
@@ -89,7 +89,7 @@ public class ParquetSchemaConverter {
                        names.toArray(new String[0]));
        }
 
-       private static TypeInformation<?> convertParquetTypeToTypeInfo(final 
Type fieldType) {
+       public static TypeInformation<?> convertParquetTypeToTypeInfo(final 
Type fieldType) {
                TypeInformation<?> typeInfo;
                if (fieldType.isPrimitive()) {
                        OriginalType originalType = fieldType.getOriginalType();
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java
index d8eba5e..6d5049f 100644
--- 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java
@@ -84,7 +84,7 @@ public class ParquetTableSourceITCase extends 
MultipleProgramsTestBase {
                batchTableEnvironment.registerTableSource("ParquetTable", 
tableSource);
                String query =
                        "SELECT foo " +
-                       "FROM ParquetTable WHERE bar.spam >= 30 AND 
CARDINALITY(arr) >= 1 AND arr[1] <= 50";
+                       "FROM ParquetTable WHERE foo >= 1 AND bar.spam >= 30 
AND CARDINALITY(arr) >= 1 AND arr[1] <= 50";
 
                Table table = batchTableEnvironment.sqlQuery(query);
                DataSet<Row> dataSet = batchTableEnvironment.toDataSet(table, 
Row.class);
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
index 6b5cf2a..6a4f176 100644
--- 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
@@ -210,6 +210,7 @@ public class TestUtil {
                        stringArray.add("String");
 
                        final NestedRecord nestedRecord = 
NestedRecord.newBuilder()
+                               .setFoo(1L)
                                .setBar(bar)
                                .setNestedArray(nestedArray)
                                .setStrArray(stringArray)

Reply via email to