This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 38e5e81  [FLINK-7244][parquet] Add ParquetTableSource.
38e5e81 is described below

commit 38e5e8161a9c763cf7df3b642830b5a97371bb00
Author: Peter Huang <huangzhenqiu0...@gmail.com>
AuthorDate: Sun Mar 24 23:19:18 2019 -0700

    [FLINK-7244][parquet] Add ParquetTableSource.
    
    This closes #8064.
---
 flink-formats/flink-parquet/pom.xml                |  28 +-
 .../flink/formats/parquet/ParquetInputFormat.java  |  16 +-
 .../formats/parquet/ParquetRowInputFormat.java     |   4 +-
 .../flink/formats/parquet/ParquetTableSource.java  | 568 +++++++++++++++++++++
 .../parquet/utils/ParquetSchemaConverter.java      |   2 +-
 .../formats/parquet/ParquetMapInputFormatTest.java |   2 +-
 .../formats/parquet/ParquetTableSourceITCase.java  | 116 +++++
 .../formats/parquet/ParquetTableSourceTest.java    | 234 +++++++++
 .../parquet/utils/ParquetRecordReaderTest.java     |   4 +-
 .../parquet/utils/ParquetSchemaConverterTest.java  |  39 +-
 .../flink/formats/parquet/utils/TestUtil.java      |  72 ++-
 .../src/test/resources/avro/nested.avsc            |   2 +-
 12 files changed, 1041 insertions(+), 46 deletions(-)

diff --git a/flink-formats/flink-parquet/pom.xml 
b/flink-formats/flink-parquet/pom.xml
index 4a2fd32..b43715d 100644
--- a/flink-formats/flink-parquet/pom.xml
+++ b/flink-formats/flink-parquet/pom.xml
@@ -29,7 +29,7 @@ under the License.
                <relativePath>..</relativePath>
        </parent>
 
-       <artifactId>flink-parquet</artifactId>
+       <artifactId>flink-parquet_${scala.binary.version}</artifactId>
        <name>flink-parquet</name>
 
        <packaging>jar</packaging>
@@ -39,7 +39,6 @@ under the License.
        </properties>
 
        <dependencies>
-
                <!-- Flink dependencies -->
 
                <dependency>
@@ -49,6 +48,31 @@ under the License.
                        <scope>provided</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-common</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+               </dependency>
+
+               <!-- Table ecosystem -->
+               <!-- Projects depending on this project won't depend on 
flink-table-*. -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+                       <optional>true</optional>
+               </dependency>
+               <!-- A planner dependency won't be necessary once FLIP-32 has 
been completed. -->
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+                       <scope>provided</scope>
+                       <optional>true</optional>
+               </dependency>
+
                <!-- Parquet Dependencies -->
 
                <dependency>
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
index e7484cb..554313e 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.formats.parquet;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.io.CheckpointableInputFormat;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -35,6 +36,7 @@ import org.apache.flink.util.Preconditions;
 
 import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.io.InputFile;
@@ -85,6 +87,8 @@ public abstract class ParquetInputFormat<E>
 
        private String[] fieldNames;
 
+       private FilterPredicate filterPredicate;
+
        private transient Counter recordConsumed;
 
        private transient MessageType expectedFileSchema;
@@ -143,6 +147,10 @@ public abstract class ParquetInputFormat<E>
                this.fieldTypes = selectFieldTypes;
        }
 
+       public void setFilterPredicate(FilterPredicate filterPredicate) {
+               this.filterPredicate = filterPredicate;
+       }
+
        @Override
        public Tuple2<Long, Long> getCurrentState() {
                return parquetRecordReader.getCurrentReadPosition();
@@ -164,7 +172,8 @@ public abstract class ParquetInputFormat<E>
                                "Escaped the file split [%s] due to mismatch of 
file schema to expected result schema",
                                split.getPath().toString()));
                } else {
-                       this.parquetRecordReader = new 
ParquetRecordReader<>(new RowReadSupport(), readSchema, FilterCompat.NOOP);
+                       this.parquetRecordReader = new 
ParquetRecordReader<>(new RowReadSupport(), readSchema,
+                               filterPredicate == null ? FilterCompat.NOOP : 
FilterCompat.get(filterPredicate));
                        this.parquetRecordReader.initialize(fileReader, 
configuration);
                        
this.parquetRecordReader.setSkipCorruptedRecord(this.skipCorruptedRecord);
 
@@ -203,6 +212,11 @@ public abstract class ParquetInputFormat<E>
                return fieldTypes;
        }
 
+       @VisibleForTesting
+       protected FilterPredicate getPredicate() {
+               return this.filterPredicate;
+       }
+
        @Override
        public void close() throws IOException {
                if (parquetRecordReader != null) {
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java
index f010a50..13da9c7 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java
@@ -31,16 +31,14 @@ import org.apache.parquet.schema.MessageType;
  */
 public class ParquetRowInputFormat extends ParquetInputFormat<Row> implements 
ResultTypeQueryable<Row> {
        private static final long serialVersionUID = 11L;
-       private RowTypeInfo returnType;
 
        public ParquetRowInputFormat(Path path, MessageType messageType) {
                super(path, messageType);
-               this.returnType = new RowTypeInfo(getFieldTypes(), 
getFieldNames());
        }
 
        @Override
        public TypeInformation<Row> getProducedType() {
-               return returnType;
+               return new RowTypeInfo(getFieldTypes(), getFieldNames());
        }
 
        @Override
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java
new file mode 100644
index 0000000..0b5d168
--- /dev/null
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetTableSource.java
@@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.And;
+import org.apache.flink.table.expressions.Attribute;
+import org.apache.flink.table.expressions.BinaryComparison;
+import org.apache.flink.table.expressions.BinaryExpression;
+import org.apache.flink.table.expressions.EqualTo;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.GreaterThan;
+import org.apache.flink.table.expressions.GreaterThanOrEqual;
+import org.apache.flink.table.expressions.LessThan;
+import org.apache.flink.table.expressions.LessThanOrEqual;
+import org.apache.flink.table.expressions.Literal;
+import org.apache.flink.table.expressions.Not;
+import org.apache.flink.table.expressions.NotEqualTo;
+import org.apache.flink.table.expressions.Or;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.table.sources.FilterableTableSource;
+import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.filter2.predicate.FilterApi;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
+import org.apache.parquet.filter2.predicate.Operators.BooleanColumn;
+import org.apache.parquet.filter2.predicate.Operators.Column;
+import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
+import org.apache.parquet.filter2.predicate.Operators.FloatColumn;
+import org.apache.parquet.filter2.predicate.Operators.IntColumn;
+import org.apache.parquet.filter2.predicate.Operators.LongColumn;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A TableSource to read Parquet files.
+ *
+ * <p>The {@link ParquetTableSource} supports projection and filter 
push-down.</p>
+ *
+ * <p>An {@link ParquetTableSource} is used as shown in the example below.
+ *
+ * <pre>
+ * {@code
+ * ParquetTableSource orcSrc = ParquetTableSource.builder()
+ *   .path("file:///my/data/file.parquet")
+ *   .schema(messageType)
+ *   .build();
+ *
+ * tEnv.registerTableSource("parquetTable", orcSrc);
+ * Table res = tableEnv.sqlQuery("SELECT * FROM parquetTable");
+ * }
+ * </pre>
+ */
+public class ParquetTableSource
+       implements BatchTableSource<Row>, FilterableTableSource<Row>, 
ProjectableTableSource<Row> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ParquetTableSource.class);
+
+       // path to read Parquet files from
+       private final String path;
+       // schema of the Parquet file
+       private final MessageType parquetSchema;
+       // the schema of table
+       private final TableSchema tableSchema;
+       // the configuration to read the file
+       private final Configuration parquetConfig;
+       // type information of the data returned by the InputFormat
+       private final RowTypeInfo typeInfo;
+       // list of selected Parquet fields to return
+       @Nullable
+       private final int[] selectedFields;
+       // predicate expression to apply
+       @Nullable
+       private final FilterPredicate predicate;
+       // flag whether a path is recursively enumerated
+       private final boolean recursiveEnumeration;
+
+       private boolean isFilterPushedDown;
+
+       private ParquetTableSource(String path, MessageType parquetSchema, 
Configuration configuration,
+                                                                       boolean 
recursiveEnumeration) {
+               this(path, parquetSchema, configuration, recursiveEnumeration, 
null, null);
+       }
+
+       private ParquetTableSource(String path, MessageType parquetSchema, 
Configuration configuration,
+                                                                       boolean 
recursiveEnumeration, @Nullable int[] selectedFields, @Nullable FilterPredicate 
predicate) {
+               Preconditions.checkNotNull(path, "Path must not be null.");
+               Preconditions.checkNotNull(parquetSchema, "ParquetSchema must 
not be null.");
+               Preconditions.checkNotNull(configuration, "Configuration must 
not be null");
+               this.path = path;
+               this.parquetSchema = parquetSchema;
+               this.parquetConfig = configuration;
+               this.selectedFields = selectedFields;
+               this.predicate = predicate;
+               this.recursiveEnumeration = recursiveEnumeration;
+
+               if (predicate != null) {
+                       this.isFilterPushedDown = true;
+               }
+               // determine the type information from the Parquet schema
+               RowTypeInfo typeInfoFromSchema = (RowTypeInfo) 
ParquetSchemaConverter.fromParquetType(parquetSchema);
+
+               // set return type info
+               if (selectedFields == null) {
+                       this.typeInfo = typeInfoFromSchema;
+               } else {
+                       this.typeInfo = 
RowTypeInfo.projectFields(typeInfoFromSchema, selectedFields);
+               }
+
+               // create a TableSchema that corresponds to the Parquet schema
+               this.tableSchema = new TableSchema(
+                       typeInfoFromSchema.getFieldNames(),
+                       typeInfoFromSchema.getFieldTypes()
+               );
+       }
+
+       @Override
+       public TableSource<Row> projectFields(int[] fields) {
+               return new ParquetTableSource(path, parquetSchema, 
parquetConfig, recursiveEnumeration, fields, null);
+       }
+
+       @Override
+       public DataSet<Row> getDataSet(ExecutionEnvironment 
executionEnvironment) {
+               ParquetRowInputFormat parquetRowInputFormat = new 
ParquetRowInputFormat(new Path(path), parquetSchema);
+               
parquetRowInputFormat.setNestedFileEnumeration(recursiveEnumeration);
+               if (selectedFields != null) {
+                       
parquetRowInputFormat.selectFields(typeInfo.getFieldNames());
+               }
+
+               if (predicate != null) {
+                       parquetRowInputFormat.setFilterPredicate(predicate);
+               }
+
+               return 
executionEnvironment.createInput(parquetRowInputFormat).name(explainSource());
+       }
+
+       @Override
+       public TableSource<Row> applyPredicate(List<Expression> predicates) {
+
+               // try to convert Flink filter expressions to Parquet 
FilterPredicates
+               List<FilterPredicate> convertedPredicates = new 
ArrayList<>(predicates.size());
+               List<Expression> unsupportedExpressions = new 
ArrayList<>(predicates.size());
+
+               for (Expression toConvert : predicates) {
+                       FilterPredicate convertedPredicate = 
toParquetPredicate(toConvert);
+                       if (convertedPredicate != null) {
+                               convertedPredicates.add(convertedPredicate);
+                       } else {
+                               unsupportedExpressions.add(toConvert);
+                       }
+               }
+
+               // update list of Flink expressions to unsupported expressions
+               predicates.clear();
+               predicates.addAll(unsupportedExpressions);
+
+               // construct single Parquet FilterPredicate
+               FilterPredicate parquetPredicate = null;
+               if (!convertedPredicates.isEmpty()) {
+                       // concat converted predicates with AND
+                       parquetPredicate = convertedPredicates.get(0);
+
+                       for (FilterPredicate converted : 
convertedPredicates.subList(1, convertedPredicates.size())) {
+                               parquetPredicate = 
FilterApi.and(parquetPredicate, converted);
+                       }
+               }
+
+               // create and return a new ParquetTableSource with Parquet 
FilterPredicate
+               return new ParquetTableSource(path, parquetSchema, 
this.parquetConfig, recursiveEnumeration, selectedFields, parquetPredicate);
+       }
+
+       @Override
+       public boolean isFilterPushedDown() {
+               return isFilterPushedDown;
+       }
+
+       @Override
+       public TypeInformation<Row> getReturnType() {
+               return typeInfo;
+       }
+
+       @Override
+       public TableSchema getTableSchema() {
+               return tableSchema;
+       }
+
+       @Override
+       public String explainSource() {
+               return "ParquetFile[path=" + path + ", schema=" + parquetSchema 
+ ", filter=" + predicateString()
+                       + ", typeInfo=" + typeInfo + "]";
+       }
+
+       private String predicateString() {
+               if (predicate != null) {
+                       return predicate.toString();
+               } else {
+                       return "TRUE";
+               }
+       }
+
+       /**
+        * Converts Flink Expression to Parquet FilterPredicate.
+        */
+       @Nullable
+       private FilterPredicate toParquetPredicate(Expression exp) {
+               if (exp instanceof Not) {
+                       FilterPredicate c = toParquetPredicate(((Not) 
exp).child());
+                       if (c == null) {
+                               return null;
+                       } else {
+                               return FilterApi.not(c);
+                       }
+               } else if (exp instanceof BinaryComparison) {
+                       BinaryComparison binComp = (BinaryComparison) exp;
+
+                       if (!isValid(binComp)) {
+                               // unsupported literal Type
+                               LOG.debug("Unsupported predict [{}] cannot be 
pushed to ParquetTableSource.", exp);
+                               return null;
+                       }
+
+                       boolean onRight = literalOnRight(binComp);
+                       Tuple2<Column, Comparable> columnPair = 
extractColumnAndLiteral(binComp);
+
+                       if (columnPair != null) {
+                               if (exp instanceof EqualTo) {
+                                       if (columnPair.f0 instanceof IntColumn) 
{
+                                               return FilterApi.eq((IntColumn) 
columnPair.f0, (Integer) columnPair.f1);
+                                       } else if (columnPair.f0 instanceof 
LongColumn) {
+                                               return 
FilterApi.eq((LongColumn) columnPair.f0, (Long) columnPair.f1);
+                                       } else if (columnPair.f0 instanceof 
DoubleColumn) {
+                                               return 
FilterApi.eq((DoubleColumn) columnPair.f0, (Double) columnPair.f1);
+                                       } else if (columnPair.f0 instanceof 
FloatColumn) {
+                                               return 
FilterApi.eq((FloatColumn) columnPair.f0, (Float) columnPair.f1);
+                                       } else if (columnPair.f0 instanceof 
BooleanColumn) {
+                                               return 
FilterApi.eq((BooleanColumn) columnPair.f0, (Boolean) columnPair.f1);
+                                       } else if (columnPair.f0 instanceof 
BinaryColumn) {
+                                               return 
FilterApi.eq((BinaryColumn) columnPair.f0, (Binary) columnPair.f1);
+                                       }
+                               } else if (exp instanceof NotEqualTo) {
+                                       if (columnPair.f0 instanceof IntColumn) 
{
+                                               return 
FilterApi.notEq((IntColumn) columnPair.f0, (Integer) columnPair.f1);
+                                       } else if (columnPair.f0 instanceof 
LongColumn) {
+                                               return 
FilterApi.notEq((LongColumn) columnPair.f0, (Long) columnPair.f1);
+                                       } else if (columnPair.f0 instanceof 
DoubleColumn) {
+                                               return 
FilterApi.notEq((DoubleColumn) columnPair.f0, (Double) columnPair.f1);
+                                       } else if (columnPair.f0 instanceof 
FloatColumn) {
+                                               return 
FilterApi.notEq((FloatColumn) columnPair.f0, (Float) columnPair.f1);
+                                       } else if (columnPair.f0 instanceof 
BooleanColumn) {
+                                               return 
FilterApi.notEq((BooleanColumn) columnPair.f0, (Boolean) columnPair.f1);
+                                       } else if (columnPair.f0 instanceof 
BinaryColumn) {
+                                               return 
FilterApi.notEq((BinaryColumn) columnPair.f0, (Binary) columnPair.f1);
+                                       }
+                               } else if (exp instanceof GreaterThan) {
+                                       if (onRight) {
+                                               return greaterThan(exp, 
columnPair);
+                                       } else {
+                                               lessThan(exp, columnPair);
+                                       }
+                               } else if (exp instanceof GreaterThanOrEqual) {
+                                       if (onRight) {
+                                               return greaterThanOrEqual(exp, 
columnPair);
+                                       } else {
+                                               return lessThanOrEqual(exp, 
columnPair);
+                                       }
+                               } else if (exp instanceof LessThan) {
+                                       if (onRight) {
+                                               return lessThan(exp, 
columnPair);
+                                       } else {
+                                               return greaterThan(exp, 
columnPair);
+                                       }
+                               } else if (exp instanceof LessThanOrEqual) {
+                                       if (onRight) {
+                                               return lessThanOrEqual(exp, 
columnPair);
+                                       } else {
+                                               return greaterThanOrEqual(exp, 
columnPair);
+                                       }
+                               } else {
+                                       // Unsupported Predicate
+                                       LOG.debug("Unsupported predicate [{}] 
cannot be pushed into ParquetTableSource.", exp);
+                                       return null;
+                               }
+                       }
+               } else if (exp instanceof BinaryExpression) {
+                       if (exp instanceof And) {
+                               LOG.debug("All of the predicates should be in 
CNF. Found an AND expression.", exp);
+                       } else if (exp instanceof Or) {
+                               FilterPredicate c1 = toParquetPredicate(((Or) 
exp).left());
+                               FilterPredicate c2 = toParquetPredicate(((Or) 
exp).right());
+
+                               if (c1 == null || c2 == null) {
+                                       return null;
+                               } else {
+                                       return FilterApi.or(c1, c2);
+                               }
+                       } else {
+                               // Unsupported Predicate
+                               LOG.debug("Unsupported predicate [{}] cannot be 
pushed into ParquetTableSource.", exp);
+                               return null;
+                       }
+               }
+
+               return null;
+       }
+
+       @Nullable
+       private FilterPredicate greaterThan(Expression exp, Tuple2<Column, 
Comparable> columnPair) {
+               Preconditions.checkArgument(exp instanceof GreaterThan, "exp 
has to be GreaterThan");
+               if (columnPair.f0 instanceof IntColumn) {
+                       return FilterApi.gt((IntColumn) columnPair.f0, 
(Integer) columnPair.f1);
+               } else if (columnPair.f0 instanceof LongColumn) {
+                       return FilterApi.gt((LongColumn) columnPair.f0, (Long) 
columnPair.f1);
+               } else if (columnPair.f0 instanceof DoubleColumn) {
+                       return FilterApi.gt((DoubleColumn) columnPair.f0, 
(Double) columnPair.f1);
+               } else if (columnPair.f0 instanceof FloatColumn) {
+                       return FilterApi.gt((FloatColumn) columnPair.f0, 
(Float) columnPair.f1);
+               }
+
+               return null;
+       }
+
+       @Nullable
+       private FilterPredicate lessThan(Expression exp, Tuple2<Column, 
Comparable> columnPair) {
+               Preconditions.checkArgument(exp instanceof LessThan, "exp has 
to be LessThan");
+
+               if (columnPair.f0 instanceof IntColumn) {
+                       return FilterApi.lt((IntColumn) columnPair.f0, 
(Integer) columnPair.f1);
+               } else if (columnPair.f0 instanceof LongColumn) {
+                       return FilterApi.lt((LongColumn) columnPair.f0, (Long) 
columnPair.f1);
+               } else if (columnPair.f0 instanceof DoubleColumn) {
+                       return FilterApi.lt((DoubleColumn) columnPair.f0, 
(Double) columnPair.f1);
+               } else if (columnPair.f0 instanceof FloatColumn) {
+                       return FilterApi.lt((FloatColumn) columnPair.f0, 
(Float) columnPair.f1);
+               }
+
+               return null;
+       }
+
+       @Nullable
+       private FilterPredicate greaterThanOrEqual(Expression exp, 
Tuple2<Column, Comparable> columnPair) {
+               Preconditions.checkArgument(exp instanceof GreaterThanOrEqual, 
"exp has to be GreaterThanOrEqual");
+               if (columnPair.f0 instanceof IntColumn) {
+                       return FilterApi.gtEq((IntColumn) columnPair.f0, 
(Integer) columnPair.f1);
+               } else if (columnPair.f0 instanceof LongColumn) {
+                       return FilterApi.gtEq((LongColumn) columnPair.f0, 
(Long) columnPair.f1);
+               } else if (columnPair.f0 instanceof DoubleColumn) {
+                       return FilterApi.gtEq((DoubleColumn) columnPair.f0, 
(Double) columnPair.f1);
+               } else if (columnPair.f0 instanceof FloatColumn) {
+                       return FilterApi.gtEq((FloatColumn) columnPair.f0, 
(Float) columnPair.f1);
+               }
+
+               return null;
+       }
+
+       @Nullable
+       private FilterPredicate lessThanOrEqual(Expression exp, Tuple2<Column, 
Comparable> columnPair) {
+               Preconditions.checkArgument(exp instanceof LessThanOrEqual, 
"exp has to be LessThanOrEqual");
+               if (columnPair.f0 instanceof IntColumn) {
+                       return FilterApi.ltEq((IntColumn) columnPair.f0, 
(Integer) columnPair.f1);
+               } else if (columnPair.f0 instanceof LongColumn) {
+                       return FilterApi.ltEq((LongColumn) columnPair.f0, 
(Long) columnPair.f1);
+               } else if (columnPair.f0 instanceof DoubleColumn) {
+                       return FilterApi.ltEq((DoubleColumn) columnPair.f0, 
(Double) columnPair.f1);
+               } else if (columnPair.f0 instanceof FloatColumn) {
+                       return FilterApi.ltEq((FloatColumn) columnPair.f0, 
(Float) columnPair.f1);
+               }
+
+               return null;
+       }
+
+       private boolean isValid(BinaryComparison comp) {
+               return (comp.left() instanceof Literal && comp.right() 
instanceof Attribute) ||
+                       (comp.left() instanceof Attribute && comp.right() 
instanceof Literal);
+       }
+
+       private boolean literalOnRight(BinaryComparison comp) {
+               if (comp.left() instanceof Literal && comp.right() instanceof 
Attribute) {
+                       return false;
+               } else if (comp.left() instanceof Attribute && comp.right() 
instanceof Literal) {
+                       return true;
+               } else {
+                       throw new RuntimeException("Invalid binary 
comparison.");
+               }
+       }
+
+       private TypeInformation<?> getLiteralType(BinaryComparison comp) {
+               if (literalOnRight(comp)) {
+                       return ((Literal) comp.right()).resultType();
+               } else {
+                       return ((Literal) comp.left()).resultType();
+               }
+       }
+
+       private Object getLiteral(BinaryComparison comp) {
+               if (literalOnRight(comp)) {
+                       return ((Literal) comp.right()).value();
+               } else {
+                       return ((Literal) comp.left()).value();
+               }
+       }
+
+       private String getColumnName(BinaryComparison comp) {
+               if (literalOnRight(comp)) {
+                       return ((Attribute) comp.left()).name();
+               } else {
+                       return ((Attribute) comp.right()).name();
+               }
+       }
+
+       @Nullable
+       private Tuple2<Column, Comparable> 
extractColumnAndLiteral(BinaryComparison comp) {
+               TypeInformation<?> typeInfo = getLiteralType(comp);
+               String columnName = getColumnName(comp);
+
+               // fetch literal and ensure it is comparable
+               Object value = getLiteral(comp);
+               // validate that literal is comparable
+               if (!(value instanceof Comparable)) {
+                       LOG.warn("Encountered a non-comparable literal of type 
{}." +
+                               "Cannot push predicate [{}] into 
ParquetTablesource." +
+                               "This is a bug and should be reported.", 
value.getClass().getCanonicalName(), comp);
+                       return null;
+               }
+
+               if (typeInfo == BasicTypeInfo.BYTE_TYPE_INFO ||
+                       typeInfo == BasicTypeInfo.SHORT_TYPE_INFO ||
+                       typeInfo == BasicTypeInfo.INT_TYPE_INFO) {
+                       return new Tuple2<>(FilterApi.intColumn(columnName), 
(Integer) value);
+               } else if (typeInfo == BasicTypeInfo.LONG_TYPE_INFO) {
+                       return new Tuple2<>(FilterApi.longColumn(columnName), 
(Long) value);
+               } else if (typeInfo == BasicTypeInfo.FLOAT_TYPE_INFO) {
+                       return new Tuple2<>(FilterApi.floatColumn(columnName), 
(Float) value);
+               } else if (typeInfo == BasicTypeInfo.BOOLEAN_TYPE_INFO) {
+                       return new 
Tuple2<>(FilterApi.booleanColumn(columnName), (Boolean) value);
+               } else if (typeInfo == BasicTypeInfo.DOUBLE_TYPE_INFO) {
+                       return new Tuple2<>(FilterApi.doubleColumn(columnName), 
(Double) value);
+               } else if (typeInfo == BasicTypeInfo.STRING_TYPE_INFO) {
+                       return new Tuple2<>(FilterApi.binaryColumn(columnName), 
Binary.fromString((String) value));
+               } else {
+                       // unsupported type
+                       return null;
+               }
+       }
+
+       // Builder
+       public static Builder builder() {
+               return new Builder();
+       }
+
+       /**
+        * Constructs an {@link ParquetTableSource}.
+        */
+       public static class Builder {
+
+               private String path;
+
+               private MessageType schema;
+
+               private Configuration config;
+
+               private boolean recursive = true;
+
+               /**
+                * Sets the path of Parquet files.
+                * If the path is specifies a directory, it will be recursively 
enumerated.
+                *
+                * @param path the path of the Parquet files.
+                * @return The Builder
+                */
+               public Builder path(String path) {
+                       Preconditions.checkNotNull(path, "Path must not be 
null");
+                       Preconditions.checkArgument(!path.isEmpty(), "Path must 
not be empty");
+                       this.path = path;
+                       return this;
+               }
+
+               /**
+                * Sets the path of the Parquet files.
+                *
+                * @param path The path of the Parquet files
+                * @param recursive Flag whether to enumerate
+                * @return The Builder
+                */
+               public Builder path(String path, boolean recursive) {
+                       Preconditions.checkNotNull(path, "Path must not be 
null");
+                       Preconditions.checkArgument(!path.isEmpty(), "Path must 
not be empty");
+                       this.path = path;
+                       this.recursive = recursive;
+                       return this;
+               }
+
+               /**
+                * Sets the Parquet schema of the files to read as a String.
+                *
+                * @param parquetSchema The parquet schema of the files to read 
as a String.
+                * @return The Builder
+                */
+               public Builder forParquetSchema(MessageType parquetSchema) {
+                       Preconditions.checkNotNull(parquetSchema, "Parquet 
schema must not be null");
+                       this.schema = parquetSchema;
+                       return this;
+               }
+
+               /**
+                * Sets a Hadoop {@link Configuration} for the Parquet Reader. 
If no configuration is configured,
+                * an empty configuration is used.
+                *
+                * @param config The Hadoop Configuration for the Parquet 
reader.
+                * @return The Builder
+                */
+               public Builder withConfiguration(Configuration config) {
+                       Preconditions.checkNotNull(config, "Configuration must 
not be null.");
+                       this.config = config;
+                       return this;
+               }
+
+               /**
+                * Builds the ParquetTableSource for this builder.
+                *
+                * @return The ParquetTableSource for this builder.
+                */
+               public ParquetTableSource build() {
+                       Preconditions.checkNotNull(path, "Path must not be 
null");
+                       Preconditions.checkNotNull(schema, "Parquet schema must 
not be null");
+                       if (config == null) {
+                               this.config = new Configuration();
+                       }
+
+                       return new ParquetTableSource(this.path, this.schema, 
this.config, this.recursive);
+               }
+       }
+}
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
index 35e1977..084c060 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java
@@ -43,7 +43,7 @@ import java.util.List;
  * Schema converter converts Parquet schema to and from Flink internal types.
  */
 public class ParquetSchemaConverter {
-       private static final Logger LOGGER = 
LoggerFactory.getLogger(RowConverter.class);
+       private static final Logger LOGGER = 
LoggerFactory.getLogger(ParquetSchemaConverter.class);
        public static final String MAP_VALUE = "value";
        public static final String LIST_ARRAY_TYPE = "array";
        public static final String LIST_ELEMENT = "element";
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java
index 2911166..f36b12c 100644
--- 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java
@@ -77,7 +77,7 @@ public class ParquetMapInputFormatTest {
                List<Map<String, String>> nestedArray = (List<Map<String, 
String>>) map.get("nestedArray");
                assertEquals(1, nestedArray.size());
                assertEquals("color", nestedArray.get(0).get("type"));
-               assertEquals("yellow", nestedArray.get(0).get("value"));
+               assertEquals(1L, nestedArray.get(0).get("value"));
        }
 
        @Test
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java
new file mode 100644
index 0000000..d8eba5e
--- /dev/null
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceITCase.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.TestUtil;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.MessageType;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Integration tests for {@link ParquetTableSource}.
+ */
+public class ParquetTableSourceITCase extends MultipleProgramsTestBase {
+       private static final AvroSchemaConverter SCHEMA_CONVERTER = new 
AvroSchemaConverter();
+       private static Path testPath;
+
+       @ClassRule
+       public static TemporaryFolder tempRoot = new TemporaryFolder();
+
+       public ParquetTableSourceITCase() {
+               super(TestExecutionMode.COLLECTION);
+       }
+
+       @BeforeClass
+       public static void setup() throws Exception {
+               testPath = createTestParquetFile(1000);
+       }
+
+       @Test
+       public void testFullScan() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               BatchTableEnvironment batchTableEnvironment = 
BatchTableEnvironment.create(env);
+               ParquetTableSource tableSource = 
createParquetTableSource(testPath);
+               batchTableEnvironment.registerTableSource("ParquetTable", 
tableSource);
+               String query =
+                       "SELECT foo " +
+                       "FROM ParquetTable";
+
+               Table table = batchTableEnvironment.sqlQuery(query);
+               DataSet<Row> dataSet = batchTableEnvironment.toDataSet(table, 
Row.class);
+               List<Row> result = dataSet.collect();
+
+               assertEquals(1000, result.size());
+       }
+
+       @Test
+       public void testScanWithProjectionAndFilter() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               BatchTableEnvironment batchTableEnvironment = 
BatchTableEnvironment.create(env);
+               ParquetTableSource tableSource = 
createParquetTableSource(testPath);
+               batchTableEnvironment.registerTableSource("ParquetTable", 
tableSource);
+               String query =
+                       "SELECT foo " +
+                       "FROM ParquetTable WHERE bar.spam >= 30 AND 
CARDINALITY(arr) >= 1 AND arr[1] <= 50";
+
+               Table table = batchTableEnvironment.sqlQuery(query);
+               DataSet<Row> dataSet = batchTableEnvironment.toDataSet(table, 
Row.class);
+               List<Row> result = dataSet.collect();
+
+               assertEquals(21, result.size());
+       }
+
+       /**
+        * Create test Parquet table source that reads a test file created by 
{@link #createTestParquetFile(int)}.
+        */
+       private ParquetTableSource createParquetTableSource(Path path) throws 
IOException {
+               MessageType nestedSchema = 
SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
+               ParquetTableSource parquetTableSource = 
ParquetTableSource.builder()
+                       .path(path.getPath())
+                       .forParquetSchema(nestedSchema)
+                       .build();
+               return parquetTableSource;
+       }
+
+       /**
+        * Create a test Parquet file with a given number of rows.
+        */
+       private static Path createTestParquetFile(int numberOfRows) throws 
Exception {
+               List<IndexedRecord> records = 
TestUtil.createRecordList(numberOfRows);
+               Path path = TestUtil.createTempParquetFile(tempRoot.getRoot(), 
TestUtil.NESTED_SCHEMA, records);
+               return path;
+       }
+}
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceTest.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceTest.java
new file mode 100644
index 0000000..1e6ebf8
--- /dev/null
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetTableSourceTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.formats.parquet.utils.TestUtil;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.expressions.EqualTo;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.GetCompositeField;
+import org.apache.flink.table.expressions.GreaterThan;
+import org.apache.flink.table.expressions.ItemAt;
+import org.apache.flink.table.expressions.Literal;
+import org.apache.flink.table.expressions.PlannerResolvedFieldReference;
+import org.apache.flink.types.Row;
+
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.filter2.predicate.FilterApi;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.schema.MessageType;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for {@link ParquetTableSource}.
+ */
+public class ParquetTableSourceTest extends TestUtil {
+       private static final AvroSchemaConverter SCHEMA_CONVERTER = new 
AvroSchemaConverter();
+       private static Path testPath;
+
+       @ClassRule
+       public static TemporaryFolder tempRoot = new TemporaryFolder();
+
+       @BeforeClass
+       public static void setup() throws Exception {
+               testPath = createTestParquetFile();
+       }
+
+       @Test
+       public void testGetReturnType() {
+               MessageType nestedSchema = 
SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
+               ParquetTableSource parquetTableSource = 
ParquetTableSource.builder()
+                       .path("dummy-path")
+                       .forParquetSchema(nestedSchema)
+                       .build();
+
+               TypeInformation<Row> returnType = 
parquetTableSource.getReturnType();
+               assertNotNull(returnType);
+               assertTrue(returnType instanceof RowTypeInfo);
+               RowTypeInfo rowType = (RowTypeInfo) returnType;
+               assertEquals(NESTED_ROW_TYPE, rowType);
+       }
+
+       @Test
+       public void testGetTableSchema() {
+               MessageType nestedSchema = 
SCHEMA_CONVERTER.convert(TestUtil.NESTED_SCHEMA);
+               ParquetTableSource parquetTableSource = 
ParquetTableSource.builder()
+                       .path("dummy-path")
+                       .forParquetSchema(nestedSchema)
+                       .build();
+
+               TableSchema schema = parquetTableSource.getTableSchema();
+               assertNotNull(schema);
+
+               RowTypeInfo expectedSchema = (RowTypeInfo) NESTED_ROW_TYPE;
+               assertArrayEquals(expectedSchema.getFieldNames(), 
schema.getFieldNames());
+               assertArrayEquals(expectedSchema.getFieldTypes(), 
schema.getFieldTypes());
+       }
+
+       @Test
+       public void testFieldsProjection() throws Exception {
+               ParquetTableSource parquetTableSource = 
createNestedTestParquetTableSource(testPath);
+               ParquetTableSource projected = (ParquetTableSource) 
parquetTableSource.projectFields(new int[] {2, 4, 6});
+
+               // ensure a new reference is returned
+               assertNotSame(projected, parquetTableSource);
+
+               // ensure table schema is the same
+               assertEquals(parquetTableSource.getTableSchema(), 
projected.getTableSchema());
+
+               // ensure that table source description differs
+               assertNotEquals(parquetTableSource.explainSource(), 
projected.explainSource());
+
+               String[] fieldNames = ((RowTypeInfo) 
NESTED_ROW_TYPE).getFieldNames();
+               TypeInformation[] fieldTypes =  ((RowTypeInfo) 
NESTED_ROW_TYPE).getFieldTypes();
+               assertEquals(
+                       Types.ROW_NAMED(
+                               new String[] {fieldNames[2], fieldNames[4], 
fieldNames[6]},
+                               fieldTypes[2], fieldTypes[4], fieldTypes[6]
+                       ),
+                       projected.getReturnType()
+               );
+
+               // ensure ParquetInputFormat is configured with selected fields
+               DataSet<Row> data = 
projected.getDataSet(ExecutionEnvironment.createLocalEnvironment());
+               InputFormat<Row, ?> inputFormat = ((DataSource<Row>) 
data).getInputFormat();
+               assertTrue(inputFormat instanceof ParquetRowInputFormat);
+               ParquetRowInputFormat parquetIF = (ParquetRowInputFormat) 
inputFormat;
+               assertArrayEquals(new String[]{fieldNames[2], fieldNames[4], 
fieldNames[6]}, parquetIF.getFieldNames());
+               assertArrayEquals(new TypeInformation<?>[]{fieldTypes[2], 
fieldTypes[4], fieldTypes[6]}, parquetIF.getFieldTypes());
+       }
+
+       @Test
+       public void testFieldsFilter() throws Exception {
+               ParquetTableSource parquetTableSource = 
createNestedTestParquetTableSource(testPath);
+
+               // expressions for supported predicates
+               Expression exp1 = new GreaterThan(
+                       new PlannerResolvedFieldReference("foo", Types.LONG),
+                       new Literal(100L, Types.LONG));
+               Expression exp2 = new EqualTo(
+                       new Literal(100L, Types.LONG),
+                       new PlannerResolvedFieldReference("bar.spam", 
Types.LONG));
+
+               // unsupported predicate
+               Expression unsupported = new EqualTo(
+                       new GetCompositeField(
+                               new ItemAt(
+                                       new PlannerResolvedFieldReference(
+                                               "nestedArray",
+                                               ObjectArrayTypeInfo.getInfoFor(
+                                                       Types.ROW_NAMED(new 
String[] {"type", "name"}, Types.STRING, Types.STRING))),
+                                               new Literal(1, Types.INT)),
+                                               "type"),
+                       new Literal("test", Types.STRING));
+               // invalid predicate
+               Expression invalidPred = new EqualTo(
+                       new PlannerResolvedFieldReference("nonField", 
Types.LONG),
+                       // some invalid, non-serializable, literal (here an 
object of this test class)
+                       new Literal(new ParquetTableSourceTest(), Types.LONG)
+               );
+
+               List<Expression> exps = new ArrayList<>();
+               exps.add(exp1);
+               exps.add(exp2);
+               exps.add(unsupported);
+               exps.add(invalidPred);
+
+               // apply predict on TableSource
+               ParquetTableSource filtered = (ParquetTableSource) 
parquetTableSource.applyPredicate(exps);
+
+               // ensure copy is returned
+               assertNotSame(parquetTableSource, filtered);
+
+               // ensure table schema is identical
+               assertEquals(parquetTableSource.getTableSchema(), 
filtered.getTableSchema());
+
+               // ensure return type is identical
+               assertEquals(NESTED_ROW_TYPE, filtered.getReturnType());
+
+               // ensure source description is not the same
+               assertNotEquals(parquetTableSource.explainSource(), 
filtered.explainSource());
+
+               // check that pushdown was recorded
+               assertTrue(filtered.isFilterPushedDown());
+               assertFalse(parquetTableSource.isFilterPushedDown());
+
+               // ensure that supported predicates were removed from list of 
offered expressions
+               assertEquals(2, exps.size());
+               assertTrue(exps.contains(unsupported));
+               assertTrue(exps.contains(invalidPred));
+
+               // ensure ParquetInputFormat is correctly configured with filter
+               DataSet<Row> data = 
filtered.getDataSet(ExecutionEnvironment.createLocalEnvironment());
+               InputFormat<Row, ?> inputFormat = ((DataSource<Row>) 
data).getInputFormat();
+               assertTrue(inputFormat instanceof ParquetRowInputFormat);
+               ParquetRowInputFormat parquetIF = (ParquetRowInputFormat) 
inputFormat;
+
+               // expected predicate
+               FilterPredicate a = FilterApi.gt(FilterApi.longColumn("foo"), 
100L);
+               FilterPredicate b = 
FilterApi.eq(FilterApi.longColumn("bar.spam"), 100L);
+               FilterPredicate expected = FilterApi.and(a, b);
+               // actual predicate
+               FilterPredicate predicate = parquetIF.getPredicate();
+               // check predicate
+               assertEquals(expected, predicate);
+       }
+
+       private static Path createTestParquetFile() throws Exception {
+               Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> 
nested = getNestedRecordTestData();
+               Path path = createTempParquetFile(tempRoot.getRoot(), 
NESTED_SCHEMA,
+                       Collections.singletonList(nested.f1));
+               return path;
+       }
+
+       private ParquetTableSource createNestedTestParquetTableSource(Path 
path) throws Exception {
+               MessageType nestedSchema = 
SCHEMA_CONVERTER.convert(NESTED_SCHEMA);
+               ParquetTableSource parquetTableSource = 
ParquetTableSource.builder()
+                       .path(path.getPath())
+                       .forParquetSchema(nestedSchema)
+                       .build();
+               return parquetTableSource;
+       }
+}
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java
index d021fc3..6c79605 100644
--- 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java
@@ -279,7 +279,7 @@ public class ParquetRecordReaderTest extends TestUtil {
                Schema arrayItemSchema = nestedArraySchema.getElementType();
                GenericRecord item = new GenericRecordBuilder(arrayItemSchema)
                        .set("type", "nested")
-                       .set("value", "nested_value").build();
+                       .set("value", 1L).build();
 
                ImmutableList.Builder<GenericRecord> list = 
ImmutableList.builder();
                list.add(item);
@@ -310,7 +310,7 @@ public class ParquetRecordReaderTest extends TestUtil {
 
                Row nestedRow = (Row) result[0];
                assertEquals("nested", nestedRow.getField(0));
-               assertEquals("nested_value", nestedRow.getField(1));
+               assertEquals(1L, nestedRow.getField(1));
        }
 
        private Schema unWrapSchema(Schema o) {
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java
index ce13c8d..10db6d2 100644
--- 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java
@@ -18,12 +18,7 @@
 
 package org.apache.flink.formats.parquet.utils;
 
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.types.Row;
 
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.OriginalType;
@@ -32,7 +27,6 @@ import org.apache.parquet.schema.Type;
 import org.junit.Test;
 
 import java.util.Arrays;
-import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 
@@ -40,27 +34,6 @@ import static org.junit.Assert.assertEquals;
  * Simple test case for conversion between Parquet schema and Flink date types.
  */
 public class ParquetSchemaConverterTest extends TestUtil {
-       private final TypeInformation<Row> simplyRowType = Types.ROW_NAMED(new 
String[] {"foo", "bar", "arr"},
-               BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, 
BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO);
-
-       private final TypeInformation<Row[]> nestedArray = 
Types.OBJECT_ARRAY(Types.ROW_NAMED(new String[] {"type", "value"},
-               BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO));
-
-       @SuppressWarnings("unchecked")
-       private final TypeInformation<Map<String, Row>> nestedMap = 
Types.MAP(BasicTypeInfo.STRING_TYPE_INFO,
-               Types.ROW_NAMED(new String[] {"type", "value"},
-                       BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO));
-
-       @SuppressWarnings("unchecked")
-       private final TypeInformation<Row> nestedRowType = Types.ROW_NAMED(
-               new String[] {"foo", "spamMap", "bar", "arr", "strArray", 
"nestedMap", "nestedArray"},
-               BasicTypeInfo.LONG_TYPE_INFO,
-               Types.MAP(BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO),
-               Types.ROW_NAMED(new String[] {"spam"}, 
BasicTypeInfo.LONG_TYPE_INFO),
-               BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO,
-               BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO,
-               nestedMap,
-               nestedArray);
 
        private final Type[] simpleStandardTypes = {
                
org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.INT64,
 Type.Repetition.OPTIONAL)
@@ -102,8 +75,8 @@ public class ParquetSchemaConverterTest extends TestUtil {
                
org.apache.parquet.schema.Types.optionalGroup().addField(org.apache.parquet.schema.Types.repeatedGroup()
                        
.addField(org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
 Type.Repetition.REQUIRED)
                                .as(OriginalType.UTF8).named("type"))
-                       
.addField(org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
 Type.Repetition.REQUIRED)
-                               .as(OriginalType.UTF8).named("value"))
+                       
.addField(org.apache.parquet.schema.Types.primitive(PrimitiveType.PrimitiveTypeName.INT64,
 Type.Repetition.REQUIRED)
+                               .as(OriginalType.INT_64).named("value"))
                        .named("element")).as(OriginalType.LIST)
                        .named("nestedArray")
        };
@@ -112,25 +85,25 @@ public class ParquetSchemaConverterTest extends TestUtil {
        public void testSimpleSchemaConversion() {
                MessageType simpleType = new MessageType("simple", 
simpleStandardTypes);
                RowTypeInfo rowTypeInfo = (RowTypeInfo) 
ParquetSchemaConverter.fromParquetType(simpleType);
-               assertEquals(simplyRowType, rowTypeInfo);
+               assertEquals(SIMPLE_ROW_TYPE, rowTypeInfo);
        }
 
        @Test
        public void testNestedSchemaConversion() {
                MessageType nestedTypes = new MessageType("nested", 
this.nestedTypes);
                RowTypeInfo rowTypeInfo = (RowTypeInfo) 
ParquetSchemaConverter.fromParquetType(nestedTypes);
-               assertEquals(nestedRowType, rowTypeInfo);
+               assertEquals(NESTED_ROW_TYPE, rowTypeInfo);
        }
 
        @Test
        public void testSimpleRowTypeConversion() {
-               MessageType simpleSchema = 
ParquetSchemaConverter.toParquetType(simplyRowType, true);
+               MessageType simpleSchema = 
ParquetSchemaConverter.toParquetType(SIMPLE_ROW_TYPE, true);
                assertEquals(Arrays.asList(simpleStandardTypes), 
simpleSchema.getFields());
        }
 
        @Test
        public void testNestedRowTypeConversion() {
-               MessageType nestedSchema = 
ParquetSchemaConverter.toParquetType(nestedRowType, true);
+               MessageType nestedSchema = 
ParquetSchemaConverter.toParquetType(NESTED_ROW_TYPE, true);
                assertEquals(Arrays.asList(nestedTypes), 
nestedSchema.getFields());
        }
 }
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
index ed64041..6b5cf2a 100644
--- 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java
@@ -19,6 +19,10 @@
 package org.apache.flink.formats.parquet.utils;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.parquet.generated.ArrayItem;
@@ -52,11 +56,33 @@ import java.util.UUID;
  * Utilities for testing schema conversion and test parquet file creation.
  */
 public class TestUtil {
+       private static final TypeInformation<Row[]> nestedArray = 
Types.OBJECT_ARRAY(Types.ROW_NAMED(
+               new String[] {"type", "value"}, BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO));
+
+       @SuppressWarnings("unchecked")
+       private static final TypeInformation<Map<String, Row>> nestedMap = 
Types.MAP(BasicTypeInfo.STRING_TYPE_INFO,
+               Types.ROW_NAMED(new String[] {"type", "value"},
+                       BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO));
+
        @ClassRule
        public static TemporaryFolder tempRoot = new TemporaryFolder();
        public static final Schema NESTED_SCHEMA = getTestSchema("nested.avsc");
        public static final Schema SIMPLE_SCHEMA = getTestSchema("simple.avsc");
 
+       public static final TypeInformation<Row> SIMPLE_ROW_TYPE = 
Types.ROW_NAMED(new String[] {"foo", "bar", "arr"},
+               BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, 
BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO);
+
+       @SuppressWarnings("unchecked")
+       public static final TypeInformation<Row> NESTED_ROW_TYPE = 
Types.ROW_NAMED(
+               new String[] {"foo", "spamMap", "bar", "arr", "strArray", 
"nestedMap", "nestedArray"},
+               BasicTypeInfo.LONG_TYPE_INFO,
+               Types.MAP(BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO),
+               Types.ROW_NAMED(new String[] {"spam"}, 
BasicTypeInfo.LONG_TYPE_INFO),
+               BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO,
+               BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO,
+               nestedMap,
+               nestedArray);
+
        public static Path createTempParquetFile(File folder, Schema schema, 
List<IndexedRecord> records) throws IOException {
                Path path = new Path(folder.getPath(), 
UUID.randomUUID().toString());
                ParquetWriter<IndexedRecord> writer = 
AvroParquetWriter.<IndexedRecord>builder(
@@ -96,7 +122,7 @@ public class TestUtil {
 
                final ArrayItem arrayItem = ArrayItem.newBuilder()
                        .setType("color")
-                       .setValue("yellow").build();
+                       .setValue(1L).build();
 
                final MapItem mapItem = MapItem.newBuilder()
                        .setType("map")
@@ -129,7 +155,7 @@ public class TestUtil {
 
                final Row arrayItemRow = new Row(2);
                arrayItemRow.setField(0, "color");
-               arrayItemRow.setField(1, "yellow");
+               arrayItemRow.setField(1, 1L);
 
                final Row mapItemRow = new Row(2);
                mapItemRow.setField(0, "map");
@@ -154,6 +180,48 @@ public class TestUtil {
                return t;
        }
 
+       /**
+        * Create a list of NestedRecord with the NESTED_SCHEMA.
+        */
+       public static List<IndexedRecord> createRecordList(long numberOfRows) {
+               List<IndexedRecord> records = new ArrayList<>(0);
+               for (long i = 0; i < numberOfRows; i++) {
+                       final Bar bar = Bar.newBuilder()
+                               .setSpam(i).build();
+
+                       final ArrayItem arrayItem = ArrayItem.newBuilder()
+                               .setType("color")
+                               .setValue(i).build();
+
+                       final MapItem mapItem = MapItem.newBuilder()
+                               .setType("map")
+                               .setValue("hashMap").build();
+
+                       List<ArrayItem> nestedArray = new ArrayList<>();
+                       nestedArray.add(arrayItem);
+
+                       Map<CharSequence, MapItem> nestedMap = new HashMap<>();
+                       nestedMap.put("mapItem", mapItem);
+
+                       List<Long> longArray = new ArrayList<>();
+                       longArray.add(i);
+
+                       List<CharSequence> stringArray = new ArrayList<>();
+                       stringArray.add("String");
+
+                       final NestedRecord nestedRecord = 
NestedRecord.newBuilder()
+                               .setBar(bar)
+                               .setNestedArray(nestedArray)
+                               .setStrArray(stringArray)
+                               .setNestedMap(nestedMap)
+                               .setArr(longArray).build();
+
+                       records.add(nestedRecord);
+               }
+
+               return records;
+       }
+
        public static RuntimeContext getMockRuntimeContext() {
                RuntimeContext mockContext = Mockito.mock(RuntimeContext.class);
                
Mockito.doReturn(UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup())
diff --git a/flink-formats/flink-parquet/src/test/resources/avro/nested.avsc 
b/flink-formats/flink-parquet/src/test/resources/avro/nested.avsc
index 2517c61..eb60752 100644
--- a/flink-formats/flink-parquet/src/test/resources/avro/nested.avsc
+++ b/flink-formats/flink-parquet/src/test/resources/avro/nested.avsc
@@ -26,7 +26,7 @@
         "name": "ArrayItem",
         "fields": [
            {"name": "type", "type": "string"},
-           {"name": "value", "type": "string"}]}
+           {"name": "value", "type": "long"}]}
         }]
       }
   ],

Reply via email to