This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push: new b6801d5 [FLINK-14953][formats] use table type to build parquet FilterPredicate b6801d5 is described below commit b6801d50ade388b28d349711fdf75e9f75b562da Author: hpeter <hpe...@uber.com> AuthorDate: Mon Nov 18 16:57:40 2019 -0800 [FLINK-14953][formats] use table type to build parquet FilterPredicate This closes #10371 (cherry picked from commit b5d958d7bfabbc3ef7e75efdcf61be43ad4044be) --- .../flink/formats/parquet/ParquetTableSource.java | 21 ++++++++++++++++----- .../parquet/utils/ParquetSchemaConverter.java | 4 ++-- .../formats/parquet/ParquetTableSourceITCase.java | 2 +- .../flink/formats/parquet/utils/TestUtil.java | 1 + 4 files changed, 20 insertions(+), 8 deletions(-) diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java index 2e3a6cf..a8d88fd 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java @@ -58,8 +58,11 @@ import org.apache.parquet.filter2.predicate.Operators.DoubleColumn; import org.apache.parquet.filter2.predicate.Operators.FloatColumn; import org.apache.parquet.filter2.predicate.Operators.IntColumn; import org.apache.parquet.filter2.predicate.Operators.LongColumn; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.io.InvalidRecordException; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -447,8 +450,16 @@ public class ParquetTableSource @Nullable private Tuple2<Column, Comparable> extractColumnAndLiteral(BinaryComparison comp) { - TypeInformation<?> typeInfo = getLiteralType(comp); String columnName = getColumnName(comp); + ColumnPath columnPath = ColumnPath.fromDotString(columnName); + TypeInformation<?> typeInfo = null; + try { + Type type = parquetSchema.getType(columnPath.toArray()); + typeInfo = ParquetSchemaConverter.convertParquetTypeToTypeInfo(type); + } catch (InvalidRecordException e) { + LOG.error("Pushed predicate on undefined field name {} in schema", columnName); + return null; + } // fetch literal and ensure it is comparable Object value = getLiteral(comp); @@ -463,15 +474,15 @@ public class ParquetTableSource if (typeInfo == BasicTypeInfo.BYTE_TYPE_INFO || typeInfo == BasicTypeInfo.SHORT_TYPE_INFO || typeInfo == BasicTypeInfo.INT_TYPE_INFO) { - return new Tuple2<>(FilterApi.intColumn(columnName), (Integer) value); + return new Tuple2<>(FilterApi.intColumn(columnName), ((Number) value).intValue()); } else if (typeInfo == BasicTypeInfo.LONG_TYPE_INFO) { - return new Tuple2<>(FilterApi.longColumn(columnName), (Long) value); + return new Tuple2<>(FilterApi.longColumn(columnName), ((Number) value).longValue()); } else if (typeInfo == BasicTypeInfo.FLOAT_TYPE_INFO) { - return new Tuple2<>(FilterApi.floatColumn(columnName), (Float) value); + return new Tuple2<>(FilterApi.floatColumn(columnName), ((Number) value).floatValue()); } else if (typeInfo == BasicTypeInfo.BOOLEAN_TYPE_INFO) { return new Tuple2<>(FilterApi.booleanColumn(columnName), (Boolean) value); } else if (typeInfo == BasicTypeInfo.DOUBLE_TYPE_INFO) { - return new Tuple2<>(FilterApi.doubleColumn(columnName), (Double) value); + return new Tuple2<>(FilterApi.doubleColumn(columnName), ((Number) value).doubleValue()); } else if (typeInfo == BasicTypeInfo.STRING_TYPE_INFO) { return new Tuple2<>(FilterApi.binaryColumn(columnName), Binary.fromString((String) value)); } else { diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java index 084c060..326931d 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java @@ -71,7 +71,7 @@ public class ParquetSchemaConverter { return (MessageType) convertField(null, typeInformation, Type.Repetition.OPTIONAL, legacyMode); } - private static TypeInformation<?> convertFields(List<Type> parquetFields) { + public static TypeInformation<?> convertFields(List<Type> parquetFields) { List<TypeInformation<?>> types = new ArrayList<>(); List<String> names = new ArrayList<>(); for (Type field : parquetFields) { @@ -89,7 +89,7 @@ public class ParquetSchemaConverter { names.toArray(new String[0])); } - private static TypeInformation<?> convertParquetTypeToTypeInfo(final Type fieldType) { + public static TypeInformation<?> convertParquetTypeToTypeInfo(final Type fieldType) { TypeInformation<?> typeInfo; if (fieldType.isPrimitive()) { OriginalType originalType = fieldType.getOriginalType(); diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java index d8eba5e..6d5049f 100644 --- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java +++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java @@ -84,7 +84,7 @@ public class ParquetTableSourceITCase extends MultipleProgramsTestBase { batchTableEnvironment.registerTableSource("ParquetTable", tableSource); String query = "SELECT foo " + - "FROM ParquetTable WHERE bar.spam >= 30 AND CARDINALITY(arr) >= 1 AND arr[1] <= 50"; + "FROM ParquetTable WHERE foo >= 1 AND bar.spam >= 30 AND CARDINALITY(arr) >= 1 AND arr[1] <= 50"; Table table = batchTableEnvironment.sqlQuery(query); DataSet<Row> dataSet = batchTableEnvironment.toDataSet(table, Row.class); diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java index 6b5cf2a..6a4f176 100644 --- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java +++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java @@ -210,6 +210,7 @@ public class TestUtil { stringArray.add("String"); final NestedRecord nestedRecord = NestedRecord.newBuilder() + .setFoo(1L) .setBar(bar) .setNestedArray(nestedArray) .setStrArray(stringArray)