http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java
 
b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java
deleted file mode 100644
index 0c9c549..0000000
--- 
a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/RowOrcInputFormat.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.orc;
-
-import org.apache.flink.api.common.io.FileInputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-
-import org.apache.orc.OrcConf;
-import org.apache.orc.OrcFile;
-import org.apache.orc.Reader;
-import org.apache.orc.RecordReader;
-import org.apache.orc.TypeDescription;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import static org.apache.flink.orc.OrcUtils.fillRows;
-
-/**
- * InputFormat to read ORC data.
- * For Optimization, reading is done in batch instead of a single row.
- */
-public class RowOrcInputFormat
-       extends FileInputFormat<Row>
-       implements ResultTypeQueryable<Row> {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(RowOrcInputFormat.class);
-       private static final int BATCH_SIZE = 1024;
-
-       private org.apache.hadoop.conf.Configuration config;
-       private TypeDescription schema;
-       private int[] fieldMapping;
-
-       private transient RowTypeInfo rowType;
-       private transient RecordReader orcRowsReader;
-       private transient VectorizedRowBatch rowBatch;
-       private transient Row[] rows;
-
-       private transient int rowInBatch;
-
-       public RowOrcInputFormat(String path, String schemaString, 
Configuration orcConfig) {
-               this(path, TypeDescription.fromString(schemaString), orcConfig);
-       }
-
-       public RowOrcInputFormat(String path, TypeDescription orcSchema, 
Configuration orcConfig) {
-               super(new Path(path));
-               this.unsplittable = false;
-               this.schema = orcSchema;
-               this.rowType = (RowTypeInfo) OrcUtils.schemaToTypeInfo(schema);
-               this.config = orcConfig;
-
-               this.fieldMapping = new int[this.schema.getChildren().size()];
-               for (int i = 0; i < fieldMapping.length; i++) {
-                       this.fieldMapping[i] = i;
-               }
-
-       }
-
-       public void setFieldMapping(int[] fieldMapping) {
-               this.fieldMapping = fieldMapping;
-               // adapt result type
-
-               TypeInformation[] fieldTypes = new 
TypeInformation[fieldMapping.length];
-               String[] fieldNames = new String[fieldMapping.length];
-               for (int i = 0; i < fieldMapping.length; i++) {
-                       fieldTypes[i] = this.rowType.getTypeAt(fieldMapping[i]);
-                       fieldNames[i] = 
this.rowType.getFieldNames()[fieldMapping[i]];
-               }
-               this.rowType = new RowTypeInfo(fieldTypes, fieldNames);
-       }
-
-       private boolean[] computeProjectionMask() {
-               boolean[] projectionMask = new boolean[schema.getMaximumId() + 
1];
-               for (int inIdx : fieldMapping) {
-                       TypeDescription fieldSchema = 
schema.getChildren().get(inIdx);
-                       for (int i = fieldSchema.getId(); i <= 
fieldSchema.getMaximumId(); i++) {
-                               projectionMask[i] = true;
-                       }
-               }
-               return projectionMask;
-       }
-
-       @Override
-       public void openInputFormat() throws IOException {
-               super.openInputFormat();
-               this.rows = new Row[BATCH_SIZE];
-               for (int i = 0; i < BATCH_SIZE; i++) {
-                       rows[i] = new Row(fieldMapping.length);
-               }
-       }
-
-       @Override
-       public void open(FileInputSplit fileSplit) throws IOException {
-
-               this.currentSplit = fileSplit;
-               Preconditions.checkArgument(this.splitStart == 0, "ORC files 
must be read from the start.");
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Opening ORC file " + fileSplit.getPath());
-               }
-
-               org.apache.hadoop.fs.Path hPath = new 
org.apache.hadoop.fs.Path(fileSplit.getPath().getPath());
-
-               Reader orcReader = OrcFile.createReader(hPath, 
OrcFile.readerOptions(config));
-
-               Reader.Options options = orcReader.options()
-                       .range(fileSplit.getStart(), fileSplit.getLength())
-                       .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(config))
-                       
.skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(config))
-                       
.tolerateMissingSchema(OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(config));
-
-               options.include(computeProjectionMask());
-
-               // check that schema of file is as expected
-               if (!this.schema.equals(orcReader.getSchema())) {
-
-                       throw new RuntimeException("Invalid schema for file at 
" + this.filePath +
-                               " Expected:" + this.schema + " Actual: " + 
orcReader.getSchema());
-               }
-
-               this.orcRowsReader = orcReader.rows(options);
-
-               // assign ids
-               this.schema.getId();
-
-               this.rowBatch = schema.createRowBatch(BATCH_SIZE);
-               rowInBatch = 0;
-       }
-
-       @Override
-       public void close() throws IOException {
-
-               if (orcRowsReader != null) {
-                       this.orcRowsReader.close();
-               }
-               this.orcRowsReader = null;
-
-       }
-
-       @Override
-       public void closeInputFormat() throws IOException {
-               this.rows = null;
-               this.rows = null;
-               this.schema = null;
-               this.rowBatch = null;
-       }
-
-       @Override
-       public boolean reachedEnd() throws IOException {
-               return !ensureBatch();
-       }
-
-       private boolean ensureBatch() throws IOException {
-
-               if (rowInBatch >= rowBatch.size) {
-                       rowInBatch = 0;
-                       boolean moreRows = orcRowsReader.nextBatch(rowBatch);
-
-                       if (moreRows) {
-                               // read rows
-                               fillRows(rows, schema, rowBatch, fieldMapping);
-                       }
-                       return moreRows;
-               }
-
-               return true;
-       }
-
-       @Override
-       public Row nextRecord(Row reuse) throws IOException {
-               return rows[this.rowInBatch++];
-       }
-
-       @Override
-       public TypeInformation<Row> getProducedType() {
-               return rowType;
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Custom serialization methods
-       // 
--------------------------------------------------------------------------------------------
-
-       private void writeObject(ObjectOutputStream out) throws IOException {
-               this.config.write(out);
-               out.writeUTF(schema.toString());
-
-               out.writeInt(fieldMapping.length);
-               for (int f : fieldMapping) {
-                       out.writeInt(f);
-               }
-
-       }
-
-       @SuppressWarnings("unchecked")
-       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-
-               org.apache.hadoop.conf.Configuration configuration = new 
org.apache.hadoop.conf.Configuration();
-               configuration.readFields(in);
-
-               if (this.config == null) {
-                       this.config = configuration;
-               }
-               this.schema = TypeDescription.fromString(in.readUTF());
-
-               this.fieldMapping = new int[in.readInt()];
-               for (int i = 0; i < fieldMapping.length; i++) {
-                       this.fieldMapping[i] = in.readInt();
-               }
-
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
 
b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
new file mode 100644
index 0000000..0efe41f
--- /dev/null
+++ 
b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
@@ -0,0 +1,795 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.orc.Reader;
+import org.apache.orc.StripeInformation;
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link OrcRowInputFormat}.
+ *
+ */
+public class OrcRowInputFormatTest {
+
+       private OrcRowInputFormat rowOrcInputFormat;
+
+       @After
+       public void tearDown() throws IOException {
+               if (rowOrcInputFormat != null) {
+                       rowOrcInputFormat.close();
+                       rowOrcInputFormat.closeInputFormat();
+               }
+               rowOrcInputFormat = null;
+       }
+
+       private static final String TEST_FILE_FLAT = "test-data-flat.orc";
+       private static final String TEST_SCHEMA_FLAT =
+               
"struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:int,_col5:string,_col6:int,_col7:int,_col8:int>";
+
+       private static final String TEST_FILE_NESTED = "test-data-nested.orc";
+       private static final String TEST_SCHEMA_NESTED =
+               "struct<" +
+                       "boolean1:boolean," +
+                       "byte1:tinyint," +
+                       "short1:smallint," +
+                       "int1:int," +
+                       "long1:bigint," +
+                       "float1:float," +
+                       "double1:double," +
+                       "bytes1:binary," +
+                       "string1:string," +
+                       "middle:struct<" +
+                               "list:array<" +
+                                       "struct<" +
+                                               "int1:int," +
+                                               "string1:string" +
+                                       ">" +
+                               ">" +
+                       ">," +
+                       "list:array<" +
+                               "struct<" +
+                                       "int1:int," +
+                                       "string1:string" +
+                               ">" +
+                       ">," +
+                       "map:map<" +
+                               "string," +
+                               "struct<" +
+                                       "int1:int," +
+                                       "string1:string" +
+                               ">" +
+                       ">" +
+               ">";
+
+       private static final String TEST_FILE_TIMETYPES = 
"test-data-timetypes.orc";
+       private static final String TEST_SCHEMA_TIMETYPES = 
"struct<time:timestamp,date:date>";
+
+       private static final String TEST_FILE_DECIMAL = "test-data-decimal.orc";
+       private static final String TEST_SCHEMA_DECIMAL = 
"struct<_col0:decimal(10,5)>";
+
+       private static final String TEST_FILE_NESTEDLIST = 
"test-data-nestedlist.orc";
+       private static final String TEST_SCHEMA_NESTEDLIST = 
"struct<mylist1:array<array<struct<mylong1:bigint>>>>";
+
+       @Test(expected = FileNotFoundException.class)
+       public void testInvalidPath() throws IOException{
+               rowOrcInputFormat =
+                       new OrcRowInputFormat("/does/not/exist", 
TEST_SCHEMA_FLAT, new Configuration());
+               rowOrcInputFormat.openInputFormat();
+               FileInputSplit[] inputSplits = 
rowOrcInputFormat.createInputSplits(1);
+               rowOrcInputFormat.open(inputSplits[0]);
+       }
+
+       @Test(expected = IndexOutOfBoundsException.class)
+       public void testInvalidProjection1() throws IOException{
+               rowOrcInputFormat =
+                       new OrcRowInputFormat(getPath(TEST_FILE_FLAT), 
TEST_SCHEMA_FLAT, new Configuration());
+               int[] projectionMask = {1, 2, 3, -1};
+               rowOrcInputFormat.selectFields(projectionMask);
+       }
+
+       @Test(expected = IndexOutOfBoundsException.class)
+       public void testInvalidProjection2() throws IOException{
+               rowOrcInputFormat =
+                       new OrcRowInputFormat(getPath(TEST_FILE_FLAT), 
TEST_SCHEMA_FLAT, new Configuration());
+               int[] projectionMask = {1, 2, 3, 9};
+               rowOrcInputFormat.selectFields(projectionMask);
+       }
+
+       @Test
+       public void testProjectionMaskNested() throws IOException{
+               rowOrcInputFormat =
+                       new OrcRowInputFormat(getPath(TEST_FILE_NESTED), 
TEST_SCHEMA_NESTED, new Configuration());
+
+               OrcRowInputFormat spy = spy(rowOrcInputFormat);
+
+               // mock options to check configuration of ORC reader
+               Reader.Options options = new Reader.Options();
+               doReturn(options).when(spy).getOptions(any());
+
+               spy.selectFields(9, 11, 2);
+               spy.openInputFormat();
+               FileInputSplit[] splits = spy.createInputSplits(1);
+               spy.open(splits[0]);
+
+               // top-level struct is false
+               boolean[] expected = new boolean[]{
+                       false, // top level
+                       false, false, // flat fields 0, 1 are out
+                       true, // flat field 2 is in
+                       false, false, false, false, false, false, // flat 
fields 3, 4, 5, 6, 7, 8 are out
+                       true, true, true, true, true, // nested field 9 is in
+                       false, false, false, false, // nested field 10 is out
+                       true, true, true, true, true}; // nested field 11 is in
+               assertArrayEquals(expected, options.getInclude());
+       }
+
+       @Test
+       public void testSplitStripesGivenSplits() throws IOException {
+               rowOrcInputFormat =
+                       new OrcRowInputFormat(getPath(TEST_FILE_FLAT), 
TEST_SCHEMA_FLAT, new Configuration());
+
+               OrcRowInputFormat spy = spy(rowOrcInputFormat);
+
+               // mock options to check configuration of ORC reader
+               Reader.Options options = spy(new Reader.Options());
+               doReturn(options).when(spy).getOptions(any());
+
+               FileInputSplit[] splits = spy.createInputSplits(3);
+
+               spy.openInputFormat();
+               spy.open(splits[0]);
+               verify(options).range(eq(3L), eq(137005L));
+               spy.open(splits[1]);
+               verify(options).range(eq(137008L), eq(136182L));
+               spy.open(splits[2]);
+               verify(options).range(eq(273190L), eq(123633L));
+       }
+
+       @Test
+       public void testSplitStripesCustomSplits() throws IOException {
+               rowOrcInputFormat =
+                       new OrcRowInputFormat(getPath(TEST_FILE_FLAT), 
TEST_SCHEMA_FLAT, new Configuration());
+
+               OrcRowInputFormat spy = spy(rowOrcInputFormat);
+
+               // mock list of stripes
+               List<StripeInformation> stripes = new ArrayList<>();
+               StripeInformation stripe1 = mock(StripeInformation.class);
+               when(stripe1.getOffset()).thenReturn(10L);
+               when(stripe1.getLength()).thenReturn(90L);
+               StripeInformation stripe2 = mock(StripeInformation.class);
+               when(stripe2.getOffset()).thenReturn(100L);
+               when(stripe2.getLength()).thenReturn(100L);
+               StripeInformation stripe3 = mock(StripeInformation.class);
+               when(stripe3.getOffset()).thenReturn(200L);
+               when(stripe3.getLength()).thenReturn(100L);
+               StripeInformation stripe4 = mock(StripeInformation.class);
+               when(stripe4.getOffset()).thenReturn(300L);
+               when(stripe4.getLength()).thenReturn(100L);
+               StripeInformation stripe5 = mock(StripeInformation.class);
+               when(stripe5.getOffset()).thenReturn(400L);
+               when(stripe5.getLength()).thenReturn(100L);
+               stripes.add(stripe1);
+               stripes.add(stripe2);
+               stripes.add(stripe3);
+               stripes.add(stripe4);
+               stripes.add(stripe5);
+               doReturn(stripes).when(spy).getStripes(any());
+
+               // mock options to check configuration of ORC reader
+               Reader.Options options = spy(new Reader.Options());
+               doReturn(options).when(spy).getOptions(any());
+
+               spy.openInputFormat();
+               // split ranging 2 stripes
+               spy.open(new FileInputSplit(0, new 
Path(getPath(TEST_FILE_FLAT)), 0, 150, new String[]{}));
+               verify(options).range(eq(10L), eq(190L));
+               // split ranging 0 stripes
+               spy.open(new FileInputSplit(1, new 
Path(getPath(TEST_FILE_FLAT)), 150, 10, new String[]{}));
+               verify(options).range(eq(0L), eq(0L));
+               // split ranging 1 stripe
+               spy.open(new FileInputSplit(2, new 
Path(getPath(TEST_FILE_FLAT)), 160, 41, new String[]{}));
+               verify(options).range(eq(200L), eq(100L));
+               // split ranging 2 stripe
+               spy.open(new FileInputSplit(3, new 
Path(getPath(TEST_FILE_FLAT)), 201, 299, new String[]{}));
+               verify(options).range(eq(300L), eq(200L));
+       }
+
+       @Test
+       public void testProducedType() throws IOException {
+               rowOrcInputFormat =
+                       new OrcRowInputFormat(getPath(TEST_FILE_NESTED), 
TEST_SCHEMA_NESTED, new Configuration());
+
+               assertTrue(rowOrcInputFormat.getProducedType() instanceof 
RowTypeInfo);
+               RowTypeInfo producedType = (RowTypeInfo) 
rowOrcInputFormat.getProducedType();
+
+               assertArrayEquals(
+                       new TypeInformation[]{
+                               // primitives
+                               Types.BOOLEAN, Types.BYTE, Types.SHORT, 
Types.INT, Types.LONG, Types.FLOAT, Types.DOUBLE,
+                               // binary
+                               
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
+                               // string
+                               Types.STRING,
+                               // struct
+                               Types.ROW_NAMED(
+                                       new String[]{"list"},
+                                       ObjectArrayTypeInfo.getInfoFor(
+                                               Types.ROW_NAMED(new 
String[]{"int1", "string1"}, Types.INT, Types.STRING))),
+                               // list
+                               ObjectArrayTypeInfo.getInfoFor(
+                                       Types.ROW_NAMED(new String[]{"int1", 
"string1"}, Types.INT, Types.STRING)),
+                               // map
+                               new MapTypeInfo<>(Types.STRING, 
Types.ROW_NAMED(new String[]{"int1", "string1"}, Types.INT, Types.STRING))
+                       },
+                       producedType.getFieldTypes());
+               assertArrayEquals(
+                       new String[]{"boolean1", "byte1", "short1", "int1", 
"long1", "float1", "double1", "bytes1", "string1", "middle", "list", "map"},
+                       producedType.getFieldNames());
+       }
+
+       @Test
+       public void testProducedTypeWithProjection() throws IOException {
+               rowOrcInputFormat =
+                       new OrcRowInputFormat(getPath(TEST_FILE_NESTED), 
TEST_SCHEMA_NESTED, new Configuration());
+
+               rowOrcInputFormat.selectFields(9, 3, 7, 10);
+
+               assertTrue(rowOrcInputFormat.getProducedType() instanceof 
RowTypeInfo);
+               RowTypeInfo producedType = (RowTypeInfo) 
rowOrcInputFormat.getProducedType();
+
+               assertArrayEquals(
+                       new TypeInformation[]{
+                               // struct
+                               Types.ROW_NAMED(
+                                       new String[]{"list"},
+                                       ObjectArrayTypeInfo.getInfoFor(
+                                               Types.ROW_NAMED(new 
String[]{"int1", "string1"}, Types.INT, Types.STRING))),
+                               // int
+                               Types.INT,
+                               // binary
+                               
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
+                               // list
+                               ObjectArrayTypeInfo.getInfoFor(
+                                       Types.ROW_NAMED(new String[]{"int1", 
"string1"}, Types.INT, Types.STRING))
+                       },
+                       producedType.getFieldTypes());
+               assertArrayEquals(
+                       new String[]{"middle", "int1", "bytes1", "list"},
+                       producedType.getFieldNames());
+       }
+
+       @Test
+       public void testSerialization() throws Exception {
+               rowOrcInputFormat =
+                       new OrcRowInputFormat(getPath(TEST_FILE_FLAT), 
TEST_SCHEMA_FLAT, new Configuration());
+
+               rowOrcInputFormat.selectFields(0, 4, 1);
+               rowOrcInputFormat.addPredicate(
+                       new OrcRowInputFormat.Equals("_col1", 
PredicateLeaf.Type.STRING, "M"));
+
+               byte[] bytes = 
InstantiationUtil.serializeObject(rowOrcInputFormat);
+               OrcRowInputFormat copy = 
InstantiationUtil.deserializeObject(bytes, getClass().getClassLoader());
+
+               FileInputSplit[] splits = copy.createInputSplits(1);
+               copy.openInputFormat();
+               copy.open(splits[0]);
+               assertFalse(copy.reachedEnd());
+               Row row = copy.nextRecord(null);
+
+               assertNotNull(row);
+               assertEquals(3, row.getArity());
+               // check first row
+               assertEquals(1, row.getField(0));
+               assertEquals(500, row.getField(1));
+               assertEquals("M", row.getField(2));
+       }
+
+       @Test
+       public void testNumericBooleanStringPredicates() throws Exception {
+               rowOrcInputFormat =
+                       new OrcRowInputFormat(getPath(TEST_FILE_NESTED), 
TEST_SCHEMA_NESTED, new Configuration());
+
+               rowOrcInputFormat.selectFields(0, 1, 2, 3, 4, 5, 6, 8);
+
+               // boolean pred
+               rowOrcInputFormat.addPredicate(
+                       new OrcRowInputFormat.Equals("boolean1", 
PredicateLeaf.Type.BOOLEAN, false));
+               // boolean pred
+               rowOrcInputFormat.addPredicate(
+                       new OrcRowInputFormat.LessThan("byte1", 
PredicateLeaf.Type.LONG, 1));
+               // boolean pred
+               rowOrcInputFormat.addPredicate(
+                       new OrcRowInputFormat.LessThanEquals("short1", 
PredicateLeaf.Type.LONG, 1024));
+               // boolean pred
+               rowOrcInputFormat.addPredicate(
+                       new OrcRowInputFormat.Between("int1", 
PredicateLeaf.Type.LONG, -1, 65536));
+               // boolean pred
+               rowOrcInputFormat.addPredicate(
+                       new OrcRowInputFormat.Equals("long1", 
PredicateLeaf.Type.LONG, 9223372036854775807L));
+               // boolean pred
+               rowOrcInputFormat.addPredicate(
+                       new OrcRowInputFormat.Equals("float1", 
PredicateLeaf.Type.FLOAT, 1.0));
+               // boolean pred
+               rowOrcInputFormat.addPredicate(
+                       new OrcRowInputFormat.Equals("double1", 
PredicateLeaf.Type.FLOAT, -15.0));
+               // boolean pred
+               rowOrcInputFormat.addPredicate(
+                       new OrcRowInputFormat.IsNull("string1", 
PredicateLeaf.Type.STRING));
+               // boolean pred
+               rowOrcInputFormat.addPredicate(
+                       new OrcRowInputFormat.Equals("string1", 
PredicateLeaf.Type.STRING, "hello"));
+
+               FileInputSplit[] splits = 
rowOrcInputFormat.createInputSplits(1);
+               rowOrcInputFormat.openInputFormat();
+
+               // mock options to check configuration of ORC reader
+               OrcRowInputFormat spy = spy(rowOrcInputFormat);
+               Reader.Options options = new Reader.Options();
+               doReturn(options).when(spy).getOptions(any());
+
+               spy.openInputFormat();
+               spy.open(splits[0]);
+
+               // verify predicate configuration
+               SearchArgument sarg = options.getSearchArgument();
+               assertNotNull(sarg);
+               assertEquals("(and leaf-0 leaf-1 leaf-2 leaf-3 leaf-4 leaf-5 
leaf-6 leaf-7 leaf-8)", sarg.getExpression().toString());
+               assertEquals(9, sarg.getLeaves().size());
+               List<PredicateLeaf> leaves = sarg.getLeaves();
+               assertEquals("(EQUALS boolean1 false)", 
leaves.get(0).toString());
+               assertEquals("(LESS_THAN byte1 1)", leaves.get(1).toString());
+               assertEquals("(LESS_THAN_EQUALS short1 1024)", 
leaves.get(2).toString());
+               assertEquals("(BETWEEN int1 -1 65536)", 
leaves.get(3).toString());
+               assertEquals("(EQUALS long1 9223372036854775807)", 
leaves.get(4).toString());
+               assertEquals("(EQUALS float1 1.0)", leaves.get(5).toString());
+               assertEquals("(EQUALS double1 -15.0)", 
leaves.get(6).toString());
+               assertEquals("(IS_NULL string1)", leaves.get(7).toString());
+               assertEquals("(EQUALS string1 hello)", 
leaves.get(8).toString());
+       }
+
+       @Test
+       public void testTimePredicates() throws Exception {
+               rowOrcInputFormat =
+                       new OrcRowInputFormat(getPath(TEST_FILE_TIMETYPES), 
TEST_SCHEMA_TIMETYPES, new Configuration());
+
+               rowOrcInputFormat.addPredicate(
+                       // OR
+                       new OrcRowInputFormat.Or(
+                               // timestamp pred
+                               new OrcRowInputFormat.Equals("time", 
PredicateLeaf.Type.TIMESTAMP, Timestamp.valueOf("1900-05-05 12:34:56.100")),
+                               // date pred
+                               new OrcRowInputFormat.Equals("date", 
PredicateLeaf.Type.DATE, Date.valueOf("1900-12-25")))
+                       );
+
+               FileInputSplit[] splits = 
rowOrcInputFormat.createInputSplits(1);
+               rowOrcInputFormat.openInputFormat();
+
+               // mock options to check configuration of ORC reader
+               OrcRowInputFormat spy = spy(rowOrcInputFormat);
+               Reader.Options options = new Reader.Options();
+               doReturn(options).when(spy).getOptions(any());
+
+               spy.openInputFormat();
+               spy.open(splits[0]);
+
+               // verify predicate configuration
+               SearchArgument sarg = options.getSearchArgument();
+               assertNotNull(sarg);
+               assertEquals("(or leaf-0 leaf-1)", 
sarg.getExpression().toString());
+               assertEquals(2, sarg.getLeaves().size());
+               List<PredicateLeaf> leaves = sarg.getLeaves();
+               assertEquals("(EQUALS time 1900-05-05 12:34:56.1)", 
leaves.get(0).toString());
+               assertEquals("(EQUALS date 1900-12-25)", 
leaves.get(1).toString());
+       }
+
+       @Test
+       public void testDecimalPredicate() throws Exception {
+               rowOrcInputFormat =
+                       new OrcRowInputFormat(getPath(TEST_FILE_DECIMAL), 
TEST_SCHEMA_DECIMAL, new Configuration());
+
+               rowOrcInputFormat.addPredicate(
+                       new OrcRowInputFormat.Not(
+                               // decimal pred
+                               new OrcRowInputFormat.Equals("_col0", 
PredicateLeaf.Type.DECIMAL, BigDecimal.valueOf(-1000.5))));
+
+               FileInputSplit[] splits = 
rowOrcInputFormat.createInputSplits(1);
+               rowOrcInputFormat.openInputFormat();
+
+               // mock options to check configuration of ORC reader
+               OrcRowInputFormat spy = spy(rowOrcInputFormat);
+               Reader.Options options = new Reader.Options();
+               doReturn(options).when(spy).getOptions(any());
+
+               spy.openInputFormat();
+               spy.open(splits[0]);
+
+               // verify predicate configuration
+               SearchArgument sarg = options.getSearchArgument();
+               assertNotNull(sarg);
+               assertEquals("(not leaf-0)", sarg.getExpression().toString());
+               assertEquals(1, sarg.getLeaves().size());
+               List<PredicateLeaf> leaves = sarg.getLeaves();
+               assertEquals("(EQUALS _col0 -1000.5)", 
leaves.get(0).toString());
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testPredicateWithInvalidColumn() throws Exception {
+               rowOrcInputFormat =
+                       new OrcRowInputFormat(getPath(TEST_FILE_NESTED), 
TEST_SCHEMA_NESTED, new Configuration());
+
+               rowOrcInputFormat.addPredicate(
+                       new OrcRowInputFormat.Equals("unknown", 
PredicateLeaf.Type.LONG, 42));
+       }
+
+       @Test
+       public void testReadNestedFile() throws IOException{
+               rowOrcInputFormat = new 
OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new 
Configuration());
+
+               FileInputSplit[] splits = 
rowOrcInputFormat.createInputSplits(1);
+               assertEquals(1, splits.length);
+               rowOrcInputFormat.openInputFormat();
+               rowOrcInputFormat.open(splits[0]);
+
+               assertFalse(rowOrcInputFormat.reachedEnd());
+               Row row = rowOrcInputFormat.nextRecord(null);
+
+               // validate first row
+               assertNotNull(row);
+               assertEquals(12, row.getArity());
+               assertEquals(false, row.getField(0));
+               assertEquals((byte) 1, row.getField(1));
+               assertEquals((short) 1024, row.getField(2));
+               assertEquals(65536, row.getField(3));
+               assertEquals(9223372036854775807L, row.getField(4));
+               assertEquals(1.0f, row.getField(5));
+               assertEquals(-15.0d, row.getField(6));
+               assertArrayEquals(new byte[]{0, 1, 2, 3, 4}, (byte[]) 
row.getField(7));
+               assertEquals("hi", row.getField(8));
+               // check nested field
+               assertTrue(row.getField(9) instanceof Row);
+               Row nested1 = (Row) row.getField(9);
+               assertEquals(1, nested1.getArity());
+               assertTrue(nested1.getField(0) instanceof Object[]);
+               Object[] nestedList1 = (Object[]) nested1.getField(0);
+               assertEquals(2, nestedList1.length);
+               assertEquals(Row.of(1, "bye"), nestedList1[0]);
+               assertEquals(Row.of(2, "sigh"), nestedList1[1]);
+               // check list
+               assertTrue(row.getField(10) instanceof Object[]);
+               Object[] list1 = (Object[]) row.getField(10);
+               assertEquals(2, list1.length);
+               assertEquals(Row.of(3, "good"), list1[0]);
+               assertEquals(Row.of(4, "bad"), list1[1]);
+               // check map
+               assertTrue(row.getField(11) instanceof HashMap);
+               HashMap map1 = (HashMap) row.getField(11);
+               assertEquals(0, map1.size());
+
+               // read second row
+               assertFalse(rowOrcInputFormat.reachedEnd());
+               row = rowOrcInputFormat.nextRecord(null);
+
+               // validate second row
+               assertNotNull(row);
+               assertEquals(12, row.getArity());
+               assertEquals(true, row.getField(0));
+               assertEquals((byte) 100, row.getField(1));
+               assertEquals((short) 2048, row.getField(2));
+               assertEquals(65536, row.getField(3));
+               assertEquals(9223372036854775807L, row.getField(4));
+               assertEquals(2.0f, row.getField(5));
+               assertEquals(-5.0d, row.getField(6));
+               assertArrayEquals(new byte[]{}, (byte[]) row.getField(7));
+               assertEquals("bye", row.getField(8));
+               // check nested field
+               assertTrue(row.getField(9) instanceof Row);
+               Row nested2 = (Row) row.getField(9);
+               assertEquals(1, nested2.getArity());
+               assertTrue(nested2.getField(0) instanceof Object[]);
+               Object[] nestedList2 = (Object[]) nested2.getField(0);
+               assertEquals(2, nestedList2.length);
+               assertEquals(Row.of(1, "bye"), nestedList2[0]);
+               assertEquals(Row.of(2, "sigh"), nestedList2[1]);
+               // check list
+               assertTrue(row.getField(10) instanceof Object[]);
+               Object[] list2 = (Object[]) row.getField(10);
+               assertEquals(3, list2.length);
+               assertEquals(Row.of(100000000, "cat"), list2[0]);
+               assertEquals(Row.of(-100000, "in"), list2[1]);
+               assertEquals(Row.of(1234, "hat"), list2[2]);
+               // check map
+               assertTrue(row.getField(11) instanceof HashMap);
+               HashMap map = (HashMap) row.getField(11);
+               assertEquals(2, map.size());
+               assertEquals(Row.of(5, "chani"), map.get("chani"));
+               assertEquals(Row.of(1, "mauddib"), map.get("mauddib"));
+
+               assertTrue(rowOrcInputFormat.reachedEnd());
+       }
+
+       @Test
+       public void testReadTimeTypeFile() throws IOException{
+               rowOrcInputFormat = new 
OrcRowInputFormat(getPath(TEST_FILE_TIMETYPES), TEST_SCHEMA_TIMETYPES, new 
Configuration());
+
+               FileInputSplit[] splits = 
rowOrcInputFormat.createInputSplits(1);
+               assertEquals(1, splits.length);
+               rowOrcInputFormat.openInputFormat();
+               rowOrcInputFormat.open(splits[0]);
+
+               assertFalse(rowOrcInputFormat.reachedEnd());
+               Row row = rowOrcInputFormat.nextRecord(null);
+
+               // validate first row
+               assertNotNull(row);
+               assertEquals(2, row.getArity());
+               assertEquals(Timestamp.valueOf("1900-05-05 12:34:56.1"), 
row.getField(0));
+               assertEquals(Date.valueOf("1900-12-25"), row.getField(1));
+
+               // check correct number of rows
+               long cnt = 1;
+               while (!rowOrcInputFormat.reachedEnd()) {
+                       assertNotNull(rowOrcInputFormat.nextRecord(null));
+                       cnt++;
+               }
+               assertEquals(70000, cnt);
+       }
+
+       @Test
+       public void testReadDecimalTypeFile() throws IOException{
+               rowOrcInputFormat = new 
OrcRowInputFormat(getPath(TEST_FILE_DECIMAL), TEST_SCHEMA_DECIMAL, new 
Configuration());
+
+               FileInputSplit[] splits = 
rowOrcInputFormat.createInputSplits(1);
+               assertEquals(1, splits.length);
+               rowOrcInputFormat.openInputFormat();
+               rowOrcInputFormat.open(splits[0]);
+
+               assertFalse(rowOrcInputFormat.reachedEnd());
+               Row row = rowOrcInputFormat.nextRecord(null);
+
+               // validate first row
+               assertNotNull(row);
+               assertEquals(1, row.getArity());
+               assertEquals(BigDecimal.valueOf(-1000.5d), row.getField(0));
+
+               // check correct number of rows
+               long cnt = 1;
+               while (!rowOrcInputFormat.reachedEnd()) {
+                       assertNotNull(rowOrcInputFormat.nextRecord(null));
+                       cnt++;
+               }
+               assertEquals(6000, cnt);
+       }
+
+       @Test
+       public void testReadNestedListFile() throws Exception {
+               rowOrcInputFormat = new 
OrcRowInputFormat(getPath(TEST_FILE_NESTEDLIST), TEST_SCHEMA_NESTEDLIST, new 
Configuration());
+
+               FileInputSplit[] splits = 
rowOrcInputFormat.createInputSplits(1);
+               assertEquals(1, splits.length);
+               rowOrcInputFormat.openInputFormat();
+               rowOrcInputFormat.open(splits[0]);
+
+               assertFalse(rowOrcInputFormat.reachedEnd());
+
+               Row row = null;
+               long cnt = 0;
+
+               // read all rows
+               while (!rowOrcInputFormat.reachedEnd()) {
+
+                       row = rowOrcInputFormat.nextRecord(row);
+                       assertEquals(1, row.getArity());
+
+                       // outer list
+                       Object[] list = (Object[]) row.getField(0);
+                       assertEquals(1, list.length);
+
+                       // nested list of rows
+                       Row[] nestedRows = (Row[]) list[0];
+                       assertEquals(1, nestedRows.length);
+                       assertEquals(1, nestedRows[0].getArity());
+
+                       // verify list value
+                       assertEquals(cnt, nestedRows[0].getField(0));
+                       cnt++;
+               }
+               // number of rows in file
+               assertEquals(100, cnt);
+       }
+
+       @Test
+       public void testReadWithProjection() throws IOException{
+               rowOrcInputFormat = new 
OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new 
Configuration());
+
+               rowOrcInputFormat.selectFields(7, 0, 10, 8);
+
+               FileInputSplit[] splits = 
rowOrcInputFormat.createInputSplits(1);
+               assertEquals(1, splits.length);
+               rowOrcInputFormat.openInputFormat();
+               rowOrcInputFormat.open(splits[0]);
+
+               assertFalse(rowOrcInputFormat.reachedEnd());
+               Row row = rowOrcInputFormat.nextRecord(null);
+
+               // validate first row
+               assertNotNull(row);
+               assertEquals(4, row.getArity());
+               // check binary
+               assertArrayEquals(new byte[]{0, 1, 2, 3, 4}, (byte[]) 
row.getField(0));
+               // check boolean
+               assertEquals(false, row.getField(1));
+               // check list
+               assertTrue(row.getField(2) instanceof Object[]);
+               Object[] list1 = (Object[]) row.getField(2);
+               assertEquals(2, list1.length);
+               assertEquals(Row.of(3, "good"), list1[0]);
+               assertEquals(Row.of(4, "bad"), list1[1]);
+               // check string
+               assertEquals("hi", row.getField(3));
+
+               // check that there is a second row with four fields
+               assertFalse(rowOrcInputFormat.reachedEnd());
+               row = rowOrcInputFormat.nextRecord(null);
+               assertNotNull(row);
+               assertEquals(4, row.getArity());
+               assertTrue(rowOrcInputFormat.reachedEnd());
+       }
+
+       @Test
+       public void testReadFileInSplits() throws IOException{
+
+               rowOrcInputFormat = new 
OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new 
Configuration());
+               rowOrcInputFormat.selectFields(0, 1);
+
+               FileInputSplit[] splits = 
rowOrcInputFormat.createInputSplits(4);
+               assertEquals(4, splits.length);
+               rowOrcInputFormat.openInputFormat();
+
+               long cnt = 0;
+               // read all splits
+               for (FileInputSplit split : splits) {
+
+                       // open split
+                       rowOrcInputFormat.open(split);
+                       // read and count all rows
+                       while (!rowOrcInputFormat.reachedEnd()) {
+                               
assertNotNull(rowOrcInputFormat.nextRecord(null));
+                               cnt++;
+                       }
+               }
+               // check that all rows have been read
+               assertEquals(1920800, cnt);
+       }
+
+       @Test
+       public void testReadFileWithFilter() throws IOException{
+
+               rowOrcInputFormat = new 
OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new 
Configuration());
+               rowOrcInputFormat.selectFields(0, 1);
+
+               // read head and tail of file
+               rowOrcInputFormat.addPredicate(
+                       new OrcRowInputFormat.Or(
+                               new OrcRowInputFormat.LessThan("_col0", 
PredicateLeaf.Type.LONG, 10L),
+                               new OrcRowInputFormat.Not(
+                                       new 
OrcRowInputFormat.LessThanEquals("_col0", PredicateLeaf.Type.LONG, 1920000L))
+                       ));
+               rowOrcInputFormat.addPredicate(
+                       new OrcRowInputFormat.Equals("_col1", 
PredicateLeaf.Type.STRING, "M"));
+
+               FileInputSplit[] splits = 
rowOrcInputFormat.createInputSplits(1);
+               assertEquals(1, splits.length);
+               rowOrcInputFormat.openInputFormat();
+
+               // open split
+               rowOrcInputFormat.open(splits[0]);
+
+               // read and count all rows
+               long cnt = 0;
+               while (!rowOrcInputFormat.reachedEnd()) {
+                       assertNotNull(rowOrcInputFormat.nextRecord(null));
+                       cnt++;
+               }
+               // check that only the first and last stripes of the file have 
been read.
+               // Each stripe has 5000 rows, except the last which has 800 
rows.
+               assertEquals(5800, cnt);
+       }
+
+       @Test
+       public void testReadFileWithEvolvedSchema() throws IOException{
+
+               rowOrcInputFormat = new OrcRowInputFormat(
+                       getPath(TEST_FILE_FLAT),
+                       
"struct<_col0:int,_col1:string,_col4:string,_col3:string>", // previous version 
of schema
+                       new Configuration());
+               rowOrcInputFormat.selectFields(3, 0, 2);
+
+               rowOrcInputFormat.addPredicate(
+                       new OrcRowInputFormat.LessThan("_col0", 
PredicateLeaf.Type.LONG, 10L));
+
+               FileInputSplit[] splits = 
rowOrcInputFormat.createInputSplits(1);
+               assertEquals(1, splits.length);
+               rowOrcInputFormat.openInputFormat();
+
+               // open split
+               rowOrcInputFormat.open(splits[0]);
+
+               // read and validate first row
+               assertFalse(rowOrcInputFormat.reachedEnd());
+               Row row = rowOrcInputFormat.nextRecord(null);
+               assertNotNull(row);
+               assertEquals(3, row.getArity());
+               assertEquals("Primary", row.getField(0));
+               assertEquals(1, row.getField(1));
+               assertEquals("M", row.getField(2));
+
+               // read and count remaining rows
+               long cnt = 1;
+               while (!rowOrcInputFormat.reachedEnd()) {
+                       assertNotNull(rowOrcInputFormat.nextRecord(null));
+                       cnt++;
+               }
+               // check that only the first and last stripes of the file have 
been read.
+               // Each stripe has 5000 rows, except the last which has 800 
rows.
+               assertEquals(5000, cnt);
+       }
+
+       private String getPath(String fileName) {
+               return 
getClass().getClassLoader().getResource(fileName).getPath();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java
 
b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java
index 3de6ab3..e6ef1e1 100644
--- 
a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java
+++ 
b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceITCase.java
@@ -18,125 +18,101 @@
 
 package org.apache.flink.orc;
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.typeutils.MapTypeInfo;
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.types.Row;
 
-import org.hamcrest.CoreMatchers;
-import org.junit.Assert;
 import org.junit.Test;
 
-import java.net.URL;
-import java.util.ArrayList;
 import java.util.List;
 
+import static org.junit.Assert.assertEquals;
+
 /**
  * Tests for {@link OrcTableSource}.
  */
 public class OrcTableSourceITCase extends MultipleProgramsTestBase {
 
-       private static final String TEST1_SCHEMA = 
"struct<boolean1:boolean,byte1:tinyint,short1:smallint,int1:int," +
-               
"long1:bigint,float1:float,double1:double,bytes1:binary,string1:string," +
-               "middle:struct<list:array<struct<int1:int,string1:string>>>," +
-               "list:array<struct<int1:int,string1:string>>," +
-               "map:map<string,struct<int1:int,string1:string>>>";
-
-       private final URL test1URL = 
getClass().getClassLoader().getResource("TestOrcFile.test1.orc");
-
-
-       private static final String[] TEST1_DATA = new String[] {
-               "false,1,1024,65536,9223372036854775807,1.0,-15.0,[0, 1, 2, 3, 
4],hi,[1,bye, 2,sigh],[3,good, 4,bad],{}",
-               
"true,100,2048,65536,9223372036854775807,2.0,-5.0,[],bye,[1,bye, 2,sigh]," +
-                       "[100000000,cat, -100000,in, 1234,hat],{chani=5,chani, 
mauddib=1,mauddib}" };
+       private static final String TEST_FILE_FLAT = "test-data-flat.orc";
+       private static final String TEST_SCHEMA_FLAT =
+               
"struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:int,_col5:string,_col6:int,_col7:int,_col8:int>";
 
        public OrcTableSourceITCase() {
                super(TestExecutionMode.COLLECTION);
        }
 
        @Test
-       public void testOrcTableSource() throws Exception {
+       public void testFullScan() throws Exception {
 
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tEnv = 
TableEnvironment.getTableEnvironment(env);
 
-               assert (test1URL != null);
-               OrcTableSource orc = new OrcTableSource(test1URL.getPath(), 
TEST1_SCHEMA);
-
-               tEnv.registerTableSource("orcTable", orc);
-
-               String query = "Select * from orcTable";
-               Table t = tEnv.sql(query);
+               OrcTableSource orc = OrcTableSource.builder()
+                       .path(getPath(TEST_FILE_FLAT))
+                       .forOrcSchema(TEST_SCHEMA_FLAT)
+                       .build();
+               tEnv.registerTableSource("OrcTable", orc);
+
+               String query =
+                       "SELECT COUNT(*), " +
+                               "MIN(_col0), MAX(_col0), " +
+                               "MIN(_col1), MAX(_col1), " +
+                               "MIN(_col2), MAX(_col2), " +
+                               "MIN(_col3), MAX(_col3), " +
+                               "MIN(_col4), MAX(_col4), " +
+                               "MIN(_col5), MAX(_col5), " +
+                               "MIN(_col6), MAX(_col6), " +
+                               "MIN(_col7), MAX(_col7), " +
+                               "MIN(_col8), MAX(_col8) " +
+                       "FROM OrcTable";
+               Table t = tEnv.sqlQuery(query);
 
                DataSet<Row> dataSet = tEnv.toDataSet(t, Row.class);
-               List<Row> records = dataSet.collect();
-
-               Assert.assertEquals(records.size(), 2);
+               List<Row> result = dataSet.collect();
 
-               List<String> actualRecords = new ArrayList<>();
-               for (Row record : records) {
-                       Assert.assertEquals(record.getArity(), 12);
-                       actualRecords.add(record.toString());
-               }
-
-               Assert.assertThat(actualRecords, 
CoreMatchers.hasItems(TEST1_DATA));
+               assertEquals(1, result.size());
+               assertEquals(
+                       "1920800,1,1920800,F,M,D,W,2 yr 
Degree,Unknown,500,10000,Good,Unknown,0,6,0,6,0,6",
+                       result.get(0).toString());
        }
 
        @Test
-       public void testOrcTableProjection() throws Exception {
+       public void testScanWithProjectionAndFilter() throws Exception {
 
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tEnv = 
TableEnvironment.getTableEnvironment(env);
 
-               assert(test1URL != null);
-               OrcTableSource orc = new OrcTableSource(test1URL.getPath(), 
TEST1_SCHEMA);
-
-               tEnv.registerTableSource("orcTable", orc);
-
-               String query = "Select middle,list,map from orcTable";
-               Table t = tEnv.sql(query);
-
-               String[] colNames = new String[] {"middle", "list", "map"};
-
-               RowTypeInfo rowTypeInfo = new RowTypeInfo(
-                       new TypeInformation[] {
-                               BasicTypeInfo.INT_TYPE_INFO,
-                               BasicTypeInfo.STRING_TYPE_INFO},
-                       new String[] {"int1", "string1"});
-
-               RowTypeInfo structTypeInfo = new RowTypeInfo(
-                       new TypeInformation[] 
{ObjectArrayTypeInfo.getInfoFor(rowTypeInfo)},
-                       new String[] {"list"});
-
-               TypeInformation[] colTypes = new TypeInformation[] {
-                       structTypeInfo,
-                       ObjectArrayTypeInfo.getInfoFor(rowTypeInfo),
-                       new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, 
rowTypeInfo)
-               };
-
-               TableSchema actualTableSchema = new TableSchema(colNames, 
colTypes);
-
-               Assert.assertArrayEquals(t.getSchema().getColumnNames(), 
colNames);
-               Assert.assertArrayEquals(t.getSchema().getTypes(), colTypes);
-               Assert.assertEquals(actualTableSchema.toString(), 
t.getSchema().toString());
+               OrcTableSource orc = OrcTableSource.builder()
+                       .path(getPath(TEST_FILE_FLAT))
+                       .forOrcSchema(TEST_SCHEMA_FLAT)
+                       .build();
+               tEnv.registerTableSource("OrcTable", orc);
+
+               String query =
+                       "SELECT " +
+                               "MIN(_col4), MAX(_col4), " +
+                               "MIN(_col3), MAX(_col3), " +
+                               "MIN(_col0), MAX(_col0), " +
+                               "MIN(_col2), MAX(_col2), " +
+                               "COUNT(*) " +
+                               "FROM OrcTable " +
+                               "WHERE (_col0 BETWEEN 4975 and 5024 OR _col0 
BETWEEN 9975 AND 10024) AND _col1 = 'F'";
+               Table t = tEnv.sqlQuery(query);
 
                DataSet<Row> dataSet = tEnv.toDataSet(t, Row.class);
-               List<Row> records = dataSet.collect();
-
-               Assert.assertEquals(records.size(), 2);
-               for (Row record: records) {
-                       Assert.assertEquals(record.getArity(), 3);
-               }
+               List<Row> result = dataSet.collect();
 
+               assertEquals(1, result.size());
+               assertEquals(
+                       "1500,6000,2 yr Degree,Unknown,4976,10024,D,W,50",
+                       result.get(0).toString());
        }
 
+       private String getPath(String fileName) {
+               return 
getClass().getClassLoader().getResource(fileName).getPath();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java
 
b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java
index c285054..b654f76 100644
--- 
a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java
+++ 
b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcTableSourceTest.java
@@ -18,96 +18,248 @@
 
 package org.apache.flink.orc;
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.typeutils.MapTypeInfo;
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.expressions.ArrayElementAt;
+import org.apache.flink.table.expressions.EqualTo;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.GetCompositeField;
+import org.apache.flink.table.expressions.GreaterThan;
+import org.apache.flink.table.expressions.Literal;
+import org.apache.flink.table.expressions.ResolvedFieldReference;
+import org.apache.flink.types.Row;
 
-import org.junit.Assert;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
-import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 /**
  * Unit Tests for {@link OrcTableSource}.
  */
 public class OrcTableSourceTest {
 
-       private static final String TEST1_SCHEMA = 
"struct<boolean1:boolean,byte1:tinyint,short1:smallint,int1:int," +
-               
"long1:bigint,float1:float,double1:double,bytes1:binary,string1:string," +
-               "middle:struct<list:array<struct<int1:int,string1:string>>>," +
-               "list:array<struct<int1:int,string1:string>>," +
-               "map:map<string,struct<int1:int,string1:string>>>";
+       private static final String TEST_FILE_NESTED = "test-data-nested.orc";
+       private static final String TEST_SCHEMA_NESTED =
+               "struct<" +
+                       "boolean1:boolean," +
+                       "byte1:tinyint," +
+                       "short1:smallint," +
+                       "int1:int," +
+                       "long1:bigint," +
+                       "float1:float," +
+                       "double1:double," +
+                       "bytes1:binary," +
+                       "string1:string," +
+                       "middle:struct<" +
+                               "list:array<" +
+                                       "struct<" +
+                                               "int1:int," +
+                                               "string1:string" +
+                                       ">" +
+                               ">" +
+                       ">," +
+                       "list:array<" +
+                               "struct<" +
+                                       "int1:int," +
+                                       "string1:string" +
+                               ">" +
+                       ">," +
+                       "map:map<" +
+                               "string," +
+                               "struct<" +
+                                       "int1:int," +
+                                       "string1:string" +
+                               ">" +
+                       ">" +
+               ">";
+
+       @Test
+       public void testGetReturnType() throws Exception {
+
+               OrcTableSource orc = OrcTableSource.builder()
+                       .path(getPath(TEST_FILE_NESTED))
+                       .forOrcSchema(TEST_SCHEMA_NESTED)
+                       .build();
 
-       private final URL test1URL = 
getClass().getClassLoader().getResource("TestOrcFile.test1.orc");
+               TypeInformation<Row> returnType = orc.getReturnType();
+               assertNotNull(returnType);
+               assertTrue(returnType instanceof RowTypeInfo);
+               RowTypeInfo rowType = (RowTypeInfo) returnType;
+
+               RowTypeInfo expected = Types.ROW_NAMED(getNestedFieldNames(), 
getNestedFieldTypes());
+               assertEquals(expected, rowType);
+       }
 
        @Test
-       public void testOrcSchema() throws Exception {
+       public void testGetTableSchema() throws Exception {
+
+               OrcTableSource orc = OrcTableSource.builder()
+                       .path(getPath(TEST_FILE_NESTED))
+                       .forOrcSchema(TEST_SCHEMA_NESTED)
+                       .build();
+
+               TableSchema schema = orc.getTableSchema();
+               assertNotNull(schema);
+               assertArrayEquals(getNestedFieldNames(), 
schema.getColumnNames());
+               assertArrayEquals(getNestedFieldTypes(), schema.getTypes());
+       }
+
+       @Test
+       public void testProjectFields() throws Exception {
+
+               OrcTableSource orc = OrcTableSource.builder()
+                       .path(getPath(TEST_FILE_NESTED))
+                       .forOrcSchema(TEST_SCHEMA_NESTED)
+                       .build();
+
+               OrcTableSource projected = (OrcTableSource) 
orc.projectFields(new int[]{3, 5, 1, 0});
 
-               assert(test1URL != null);
-               OrcTableSource orc = new OrcTableSource(test1URL.getPath(), 
TEST1_SCHEMA);
+               // ensure copy is returned
+               assertTrue(orc != projected);
 
-               String expectedSchema = "Row(boolean1: Boolean, byte1: Byte, 
short1: Short, int1: Integer, long1: Long, " +
-                       "float1: Float, double1: Double, bytes1: byte[], 
string1: String, " +
-                       "middle: Row(list: ObjectArrayTypeInfo<Row(int1: 
Integer, string1: String)>), " +
-                       "list: ObjectArrayTypeInfo<Row(int1: Integer, string1: 
String)>, " +
-                       "map: Map<String, Row(int1: Integer, string1: 
String)>)";
+               // ensure table schema is identical
+               assertEquals(orc.getTableSchema(), projected.getTableSchema());
 
-               Assert.assertEquals(expectedSchema, 
orc.getReturnType().toString());
+               // ensure return type was adapted
+               String[] fieldNames = getNestedFieldNames();
+               TypeInformation[] fieldTypes = getNestedFieldTypes();
+               assertEquals(
+                       Types.ROW_NAMED(
+                               new String[] {fieldNames[3], fieldNames[5], 
fieldNames[1], fieldNames[0]},
+                               new TypeInformation[] {fieldTypes[3], 
fieldTypes[5], fieldTypes[1], fieldTypes[0]}),
+                       projected.getReturnType());
 
+               // ensure IF is configured with selected fields
+               OrcTableSource spyTS = spy(projected);
+               OrcRowInputFormat mockIF = mock(OrcRowInputFormat.class);
+               doReturn(mockIF).when(spyTS).buildOrcInputFormat();
+               spyTS.getDataSet(mock(ExecutionEnvironment.class));
+               verify(mockIF).selectFields(eq(3), eq(5), eq(1), eq(0));
        }
 
        @Test
-       public void testOrcTableSchema() throws Exception {
+       public void testApplyPredicate() throws Exception {
 
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               BatchTableEnvironment tEnv = 
TableEnvironment.getTableEnvironment(env);
+               OrcTableSource orc = OrcTableSource.builder()
+                       .path(getPath(TEST_FILE_NESTED))
+                       .forOrcSchema(TEST_SCHEMA_NESTED)
+                       .build();
 
-               assert(test1URL != null);
-               OrcTableSource orc = new OrcTableSource(test1URL.getPath(), 
TEST1_SCHEMA);
+               // expressions for predicates
+               Expression pred1 = new GreaterThan(
+                       new ResolvedFieldReference("int1", Types.INT),
+                       new Literal(100, Types.INT));
+               Expression pred2 = new EqualTo(
+                       new ResolvedFieldReference("string1", Types.STRING),
+                       new Literal("hello", Types.STRING));
+               Expression pred3 = new EqualTo(
+                       new GetCompositeField(
+                               new ArrayElementAt(
+                                       new ResolvedFieldReference(
+                                               "list",
+                                               ObjectArrayTypeInfo.getInfoFor(
+                                                       Types.ROW_NAMED(new 
String[] {"int1", "string1"}, Types.INT, Types.STRING))),
+                                       new Literal(1, Types.INT)),
+                               "int1"),
+                       new Literal(1, Types.INT)
+                       );
+               ArrayList<Expression> preds = new ArrayList<>();
+               preds.add(pred1);
+               preds.add(pred2);
+               preds.add(pred3);
 
-               tEnv.registerTableSource("orcTable", orc);
-               String query = "Select * from orcTable";
-               Table t = tEnv.sql(query);
+               // apply predicates on TableSource
+               OrcTableSource projected = (OrcTableSource) 
orc.applyPredicate(preds);
 
-               String[] colNames = new String[] {
-                       "boolean1", "byte1", "short1", "int1", "long1", 
"float1",
-                       "double1", "bytes1", "string1", "list", "list0", "map"
-               };
+               // ensure copy is returned
+               assertTrue(orc != projected);
 
-               RowTypeInfo rowTypeInfo = new RowTypeInfo(
-                       new TypeInformation[] {
-                               BasicTypeInfo.INT_TYPE_INFO,
-                               BasicTypeInfo.STRING_TYPE_INFO},
-                       new String[] {"int1", "string1"});
-
-               TypeInformation[] colTypes = new TypeInformation[] {
-                       BasicTypeInfo.BOOLEAN_TYPE_INFO,
-                       BasicTypeInfo.BYTE_TYPE_INFO,
-                       BasicTypeInfo.SHORT_TYPE_INFO,
-                       BasicTypeInfo.INT_TYPE_INFO,
-                       BasicTypeInfo.LONG_TYPE_INFO,
-                       BasicTypeInfo.FLOAT_TYPE_INFO,
-                       BasicTypeInfo.DOUBLE_TYPE_INFO,
-                       PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
-                       BasicTypeInfo.STRING_TYPE_INFO,
-                       ObjectArrayTypeInfo.getInfoFor(rowTypeInfo),
-                       ObjectArrayTypeInfo.getInfoFor(rowTypeInfo),
-                       new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, 
rowTypeInfo)
-               };
-               TableSchema expectedTableSchema = new TableSchema(colNames, 
colTypes);
+               // ensure table schema is identical
+               assertEquals(orc.getTableSchema(), projected.getTableSchema());
+
+               // ensure return type is identical
+               assertEquals(
+                       Types.ROW_NAMED(getNestedFieldNames(), 
getNestedFieldTypes()),
+                       projected.getReturnType());
+
+               // ensure IF is configured with supported predicates
+               OrcTableSource spyTS = spy(projected);
+               OrcRowInputFormat mockIF = mock(OrcRowInputFormat.class);
+               doReturn(mockIF).when(spyTS).buildOrcInputFormat();
+               spyTS.getDataSet(mock(ExecutionEnvironment.class));
+
+               ArgumentCaptor<OrcRowInputFormat.Predicate> arguments = 
ArgumentCaptor.forClass(OrcRowInputFormat.Predicate.class);
+               verify(mockIF, times(2)).addPredicate(arguments.capture());
+               List<String> values = 
arguments.getAllValues().stream().map(Object::toString).collect(Collectors.toList());
+               assertTrue(values.contains(
+                       new OrcRowInputFormat.Not(new 
OrcRowInputFormat.LessThanEquals("int1", PredicateLeaf.Type.LONG, 
100)).toString()));
+               assertTrue(values.contains(
+                       new OrcRowInputFormat.Equals("string1", 
PredicateLeaf.Type.STRING, "hello").toString()));
+
+               // ensure filter pushdown is correct
+               assertTrue(spyTS.isFilterPushedDown());
+               assertFalse(orc.isFilterPushedDown());
+       }
+
+       private String getPath(String fileName) {
+               return 
getClass().getClassLoader().getResource(fileName).getPath();
+       }
 
-               Assert.assertArrayEquals(t.getSchema().getColumnNames(), 
colNames);
-               Assert.assertArrayEquals(t.getSchema().getTypes(), colTypes);
-               Assert.assertEquals(expectedTableSchema.toString(), 
t.getSchema().toString());
+       private String[] getNestedFieldNames() {
+               return new String[] {
+                       "boolean1", "byte1", "short1", "int1", "long1", 
"float1", "double1", "bytes1", "string1", "middle", "list", "map"
+               };
+       }
 
+       private TypeInformation[] getNestedFieldTypes() {
+               return new TypeInformation[]{
+                       Types.BOOLEAN, Types.BYTE, Types.SHORT, Types.INT, 
Types.LONG, Types.FLOAT, Types.DOUBLE,
+                       PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, 
Types.STRING,
+                       Types.ROW_NAMED(
+                               new String[]{"list"},
+                               ObjectArrayTypeInfo.getInfoFor(
+                                       Types.ROW_NAMED(
+                                               new String[]{"int1", "string1"},
+                                               Types.INT, Types.STRING
+                                       )
+                               )
+                       ),
+                       ObjectArrayTypeInfo.getInfoFor(
+                               Types.ROW_NAMED(
+                                       new String[]{"int1", "string1"},
+                                       Types.INT, Types.STRING
+                               )
+                       ),
+                       new MapTypeInfo<>(
+                               Types.STRING,
+                               Types.ROW_NAMED(
+                                       new String[]{"int1", "string1"},
+                                       Types.INT, Types.STRING
+                               )
+                       )
+               };
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java
 
b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java
new file mode 100644
index 0000000..2cb1715
--- /dev/null
+++ 
b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import org.apache.orc.TypeDescription;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link OrcUtils}.
+ *
+ */
+public class OrcUtilsTest {
+
+       @Test
+       public void testFlatSchemaToTypeInfo1() {
+
+               String schema =
+                       "struct<" +
+                               "boolean1:boolean," +
+                               "byte1:tinyint," +
+                               "short1:smallint," +
+                               "int1:int," +
+                               "long1:bigint," +
+                               "float1:float," +
+                               "double1:double," +
+                               "bytes1:binary," +
+                               "string1:string," +
+                               "date1:date," +
+                               "timestamp1:timestamp," +
+                               "decimal1:decimal(5,2)" +
+                       ">";
+               TypeInformation typeInfo = 
OrcUtils.schemaToTypeInfo(TypeDescription.fromString(schema));
+
+               Assert.assertNotNull(typeInfo);
+               Assert.assertTrue(typeInfo instanceof RowTypeInfo);
+               RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo;
+
+               // validate field types
+               Assert.assertArrayEquals(
+                       new TypeInformation[]{
+                               Types.BOOLEAN, Types.BYTE, Types.SHORT, 
Types.INT, Types.LONG, Types.FLOAT, Types.DOUBLE,
+                               
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, Types.STRING,
+                               Types.SQL_DATE, Types.SQL_TIMESTAMP, 
BasicTypeInfo.BIG_DEC_TYPE_INFO
+                       },
+                       rowTypeInfo.getFieldTypes());
+
+               // validate field names
+               Assert.assertArrayEquals(
+                       new String[] {
+                               "boolean1", "byte1", "short1", "int1", "long1", 
"float1", "double1",
+                               "bytes1", "string1", "date1", "timestamp1", 
"decimal1"
+                       },
+                       rowTypeInfo.getFieldNames());
+
+       }
+
+       @Test
+       public void testNestedSchemaToTypeInfo1() {
+
+               String schema =
+                       "struct<" +
+                               "middle:struct<" +
+                                       "list:array<" +
+                                               "struct<" +
+                                                       "int1:int," +
+                                                       "string1:string" +
+                                               ">" +
+                                       ">" +
+                               ">," +
+                               "list:array<" +
+                                       "struct<" +
+                                               "int1:int," +
+                                               "string1:string" +
+                                       ">" +
+                               ">," +
+                               "map:map<" +
+                                       "string," +
+                                       "struct<" +
+                                               "int1:int," +
+                                               "string1:string" +
+                                       ">" +
+                               ">" +
+                       ">";
+               TypeInformation typeInfo = 
OrcUtils.schemaToTypeInfo(TypeDescription.fromString(schema));
+
+               Assert.assertNotNull(typeInfo);
+               Assert.assertTrue(typeInfo instanceof RowTypeInfo);
+               RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo;
+
+               // validate field types
+               Assert.assertArrayEquals(
+                       new TypeInformation[]{
+                               Types.ROW_NAMED(
+                                       new String[]{"list"},
+                                       ObjectArrayTypeInfo.getInfoFor(
+                                               Types.ROW_NAMED(
+                                                       new String[]{"int1", 
"string1"},
+                                                       Types.INT, Types.STRING
+                                               )
+                                       )
+                               ),
+                               ObjectArrayTypeInfo.getInfoFor(
+                                       Types.ROW_NAMED(
+                                               new String[]{"int1", "string1"},
+                                               Types.INT, Types.STRING
+                                       )
+                               ),
+                               new MapTypeInfo<>(
+                                       Types.STRING,
+                                       Types.ROW_NAMED(
+                                               new String[]{"int1", "string1"},
+                                               Types.INT, Types.STRING
+                                       )
+                               )
+                       },
+                       rowTypeInfo.getFieldTypes());
+
+               // validate field names
+               Assert.assertArrayEquals(
+                       new String[] {"middle", "list", "map"},
+                       rowTypeInfo.getFieldNames());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/RowOrcInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/RowOrcInputFormatTest.java
 
b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/RowOrcInputFormatTest.java
deleted file mode 100644
index 60008a0..0000000
--- 
a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/RowOrcInputFormatTest.java
+++ /dev/null
@@ -1,472 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.orc;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.types.Row;
-
-import org.apache.hadoop.conf.Configuration;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Tests for the {@link RowOrcInputFormat}.
- */
-
-public class RowOrcInputFormatTest {
-
-       private RowOrcInputFormat rowOrcInputFormat;
-
-       @After
-       public void tearDown() throws IOException {
-               if (rowOrcInputFormat != null) {
-                       rowOrcInputFormat.close();
-                       rowOrcInputFormat.closeInputFormat();
-               }
-               rowOrcInputFormat = null;
-       }
-
-       private final URL test1URL = 
getClass().getClassLoader().getResource("TestOrcFile.test1.orc");
-
-       private static final String TEST1_SCHEMA = 
"struct<boolean1:boolean,byte1:tinyint,short1:smallint,int1:int," +
-               
"long1:bigint,float1:float,double1:double,bytes1:binary,string1:string," +
-               "middle:struct<list:array<struct<int1:int,string1:string>>>," +
-               "list:array<struct<int1:int,string1:string>>," +
-               "map:map<string,struct<int1:int,string1:string>>>";
-
-       private static final String[] TEST1_DATA = new String[] {
-               "false,1,1024,65536,9223372036854775807,1.0,-15.0,[0, 1, 2, 3, 
4],hi,[1,bye, 2,sigh],[3,good, 4,bad],{}",
-               
"true,100,2048,65536,9223372036854775807,2.0,-5.0,[],bye,[1,bye, 2,sigh]," +
-                       "[100000000,cat, -100000,in, 1234,hat],{chani=5,chani, 
mauddib=1,mauddib}" };
-
-       private static final String[] TEST1_PROJECTED_DATA = new String[] {
-               "{},[3,good, 4,bad],[1,bye, 2,sigh],hi,[0, 1, 2, 3, 
4],-15.0,1.0,9223372036854775807,65536,1024,1,false",
-               "{chani=5,chani, mauddib=1,mauddib},[100000000,cat, -100000,in, 
1234,hat],[1,bye, 2,sigh],bye," +
-                       "[],-5.0,2.0,9223372036854775807,65536,2048,100,true" };
-
-       private static final String TEST1_INVALID_SCHEMA = 
"struct<boolean1:int,byte1:tinyint,short1:smallint,int1:int," +
-               
"long1:bigint,float1:float,double1:double,bytes1:binary,string1:string," +
-               "middle:struct<list:array<struct<int1:int,string1:string>>>," +
-               "list:array<struct<int1:int,string1:string>>," +
-               "map:map<string,struct<int1:int,string1:string>>>";
-
-       @Test(expected = FileNotFoundException.class)
-       public void testInvalidPath() throws IOException{
-
-               rowOrcInputFormat = new 
RowOrcInputFormat("TestOrcFile.test2.orc", TEST1_SCHEMA, new Configuration());
-               rowOrcInputFormat.openInputFormat();
-               FileInputSplit[] inputSplits = 
rowOrcInputFormat.createInputSplits(1);
-               rowOrcInputFormat.open(inputSplits[0]);
-
-       }
-
-       @Test(expected = RuntimeException.class)
-       public void testInvalidSchema() throws IOException{
-
-               assert(test1URL != null);
-               rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), 
TEST1_INVALID_SCHEMA, new Configuration());
-               rowOrcInputFormat.openInputFormat();
-               FileInputSplit[] inputSplits = 
rowOrcInputFormat.createInputSplits(1);
-               rowOrcInputFormat.open(inputSplits[0]);
-
-       }
-
-       @Test(expected = IndexOutOfBoundsException.class)
-       public void testInvalidProjection() throws IOException{
-
-               assert(test1URL != null);
-               rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), 
TEST1_SCHEMA, new Configuration());
-               int[] projectionMask = {14};
-               rowOrcInputFormat.setFieldMapping(projectionMask);
-       }
-
-       @Test
-       public void testMajorDataTypes() throws IOException{
-
-               // test for 
boolean,byte,short,int,long,float,double,bytes,string,struct,list,map
-               assert(test1URL != null);
-               rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), 
TEST1_SCHEMA, new Configuration());
-               rowOrcInputFormat.openInputFormat();
-               FileInputSplit[] inputSplits = 
rowOrcInputFormat.createInputSplits(1);
-
-               Assert.assertEquals(inputSplits.length, 1);
-
-               Row row = null;
-               int count = 0;
-               for (FileInputSplit split : inputSplits) {
-                       rowOrcInputFormat.open(split);
-                       while (!rowOrcInputFormat.reachedEnd()) {
-                               row = rowOrcInputFormat.nextRecord(row);
-                               Assert.assertEquals(row.toString(), 
TEST1_DATA[count++]);
-                       }
-               }
-       }
-
-       @Test
-       public void testProjection() throws IOException{
-
-               assert(test1URL != null);
-               rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), 
TEST1_SCHEMA, new Configuration());
-               int[] projectionMask = {11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0};
-               rowOrcInputFormat.setFieldMapping(projectionMask);
-               rowOrcInputFormat.openInputFormat();
-               FileInputSplit[] inputSplits = 
rowOrcInputFormat.createInputSplits(1);
-
-               Assert.assertEquals(inputSplits.length, 1);
-
-               Row row = null;
-               int count = 0;
-               for (FileInputSplit split : inputSplits) {
-                       rowOrcInputFormat.open(split);
-                       while (!rowOrcInputFormat.reachedEnd()) {
-                               row = rowOrcInputFormat.nextRecord(row);
-                               Assert.assertEquals(row.toString(), 
TEST1_PROJECTED_DATA[count++]);
-                       }
-               }
-
-       }
-
-       @Test
-       public void testTimeStampAndDate() throws IOException{
-
-               URL expectedDataURL = 
getClass().getClassLoader().getResource("TestOrcFile.testDate1900.dat");
-               assert(expectedDataURL != null);
-               List<String> expectedTimeStampAndDate = 
Files.readAllLines(Paths.get(expectedDataURL.getPath()));
-
-               URL testInputURL = 
getClass().getClassLoader().getResource("TestOrcFile.testDate1900.orc");
-               assert(testInputURL != null);
-               String path = testInputURL.getPath();
-               String schema = "struct<time:timestamp,date:date>";
-               rowOrcInputFormat = new RowOrcInputFormat(path, schema, new 
Configuration());
-               rowOrcInputFormat.openInputFormat();
-
-               FileInputSplit[] inputSplits = 
rowOrcInputFormat.createInputSplits(1);
-
-               Assert.assertEquals(inputSplits.length, 1);
-
-               List<Object> actualTimeStampAndDate = new ArrayList<>();
-
-               Row row = null;
-               int count = 0;
-               for (FileInputSplit split : inputSplits) {
-                       rowOrcInputFormat.open(split);
-                       while (!rowOrcInputFormat.reachedEnd()) {
-                               row = rowOrcInputFormat.nextRecord(row);
-                               count++;
-                               if (count <= 10000) {
-                                       
actualTimeStampAndDate.add(row.getField(0) + "," + row.getField(1));
-                               }
-
-                       }
-               }
-               Assert.assertEquals(count, 70000);
-               Assert.assertEquals(expectedTimeStampAndDate.size(), 
actualTimeStampAndDate.size());
-               Assert.assertEquals(expectedTimeStampAndDate.toString(), 
actualTimeStampAndDate.toString());
-
-       }
-
-       @Test
-       public void testDecimal() throws IOException{
-
-               URL expectedDataURL = 
getClass().getClassLoader().getResource("decimal.dat");
-               List<String> expectedDecimal = 
Files.readAllLines(Paths.get(expectedDataURL.getPath()));
-
-               URL testInputURL = 
getClass().getClassLoader().getResource("decimal.orc");
-               assert(testInputURL != null);
-               String path = testInputURL.getPath();
-               String schema = "struct<_col0:decimal(10,5)>";
-               rowOrcInputFormat = new RowOrcInputFormat(path, schema, new 
Configuration());
-               rowOrcInputFormat.openInputFormat();
-
-               FileInputSplit[] inputSplits = 
rowOrcInputFormat.createInputSplits(1);
-
-               Assert.assertEquals(inputSplits.length, 1);
-
-               List<Object> actualDecimal = new ArrayList<>();
-
-               Row row = null;
-               for (FileInputSplit split : inputSplits) {
-                       rowOrcInputFormat.open(split);
-                       while (!rowOrcInputFormat.reachedEnd()) {
-                               row = rowOrcInputFormat.nextRecord(row);
-                               actualDecimal.add(row.getField(0));
-                       }
-               }
-
-               Assert.assertEquals(expectedDecimal.size(), 
actualDecimal.size());
-               Assert.assertEquals(expectedDecimal.toString(), 
actualDecimal.toString());
-
-       }
-
-       @Test
-       public void testEmptyFile() throws IOException{
-
-               URL testInputURL = 
getClass().getClassLoader().getResource("TestOrcFile.emptyFile.orc");
-               assert(testInputURL != null);
-               String path = testInputURL.getPath();
-
-               rowOrcInputFormat = new RowOrcInputFormat(path, TEST1_SCHEMA, 
new Configuration());
-               rowOrcInputFormat.openInputFormat();
-
-               FileInputSplit[] inputSplits = 
rowOrcInputFormat.createInputSplits(1);
-
-               Assert.assertEquals(inputSplits.length, 1);
-
-               Row row = new Row(1);
-               int count = 0;
-               for (FileInputSplit split : inputSplits) {
-                       rowOrcInputFormat.open(split);
-                       while (!rowOrcInputFormat.reachedEnd()) {
-                               row = rowOrcInputFormat.nextRecord(row);
-                               count++;
-                       }
-               }
-
-               Assert.assertEquals(count, 0);
-       }
-
-       @Test
-       public void testLargeFile() throws IOException{
-
-               URL testInputURL = 
getClass().getClassLoader().getResource("demo-11-none.orc");
-               assert(testInputURL != null);
-               String path = testInputURL.getPath();
-               String schema = 
"struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:int," +
-                       "_col5:string,_col6:int,_col7:int,_col8:int>";
-
-               rowOrcInputFormat = new RowOrcInputFormat(path, schema, new 
Configuration());
-               rowOrcInputFormat.openInputFormat();
-
-               FileInputSplit[] inputSplits = 
rowOrcInputFormat.createInputSplits(1);
-
-               Assert.assertEquals(inputSplits.length, 1);
-
-               Row row = new Row(1);
-               int count = 0;
-               for (FileInputSplit split : inputSplits) {
-                       rowOrcInputFormat.open(split);
-                       while (!rowOrcInputFormat.reachedEnd()) {
-                               row = rowOrcInputFormat.nextRecord(row);
-                               count++;
-                       }
-               }
-
-               Assert.assertEquals(count, 1920800);
-       }
-
-       @Test
-       public void testProducedType() throws IOException{
-
-               assert(test1URL != null);
-               rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), 
TEST1_SCHEMA, new Configuration());
-               rowOrcInputFormat.openInputFormat();
-               FileInputSplit[] inputSplits = 
rowOrcInputFormat.createInputSplits(1);
-
-               Assert.assertEquals(inputSplits.length, 1);
-
-               rowOrcInputFormat.open(inputSplits[0]);
-
-               TypeInformation<Row> type = rowOrcInputFormat.getProducedType();
-               Assert.assertEquals(type.toString(), "Row(boolean1: Boolean, 
byte1: Byte, short1: Short, int1: Integer," +
-                       " long1: Long, float1: Float, double1: Double, bytes1: 
byte[], string1: String," +
-                       " middle: Row(list: ObjectArrayTypeInfo<Row(int1: 
Integer, string1: String)>)," +
-                       " list: ObjectArrayTypeInfo<Row(int1: Integer, string1: 
String)>," +
-                       " map: Map<String, Row(int1: Integer, string1: 
String)>)");
-
-       }
-
-       @Test
-       public void testProducedTypeWithProjection() throws IOException{
-
-               assert(test1URL != null);
-               rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), 
TEST1_SCHEMA, new Configuration());
-               int[] projectionMask = {9, 10, 11};
-               rowOrcInputFormat.setFieldMapping(projectionMask);
-               rowOrcInputFormat.openInputFormat();
-               FileInputSplit[] inputSplits = 
rowOrcInputFormat.createInputSplits(1);
-
-               Assert.assertEquals(inputSplits.length, 1);
-
-               rowOrcInputFormat.open(inputSplits[0]);
-
-               TypeInformation<Row> type = rowOrcInputFormat.getProducedType();
-               Assert.assertEquals(type.toString(), "Row(middle: Row(list: 
ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>)," +
-                       " list: ObjectArrayTypeInfo<Row(int1: Integer, string1: 
String)>," +
-                       " map: Map<String, Row(int1: Integer, string1: 
String)>)");
-
-       }
-
-       @Test
-       public void testLongList() throws Exception {
-
-               URL testInputURL = 
getClass().getClassLoader().getResource("TestOrcFile.listlong.orc");
-               assert(testInputURL != null);
-               String path = testInputURL.getPath();
-               String schema = "struct<mylist1:array<bigint>>";
-
-               rowOrcInputFormat = new RowOrcInputFormat(path, schema, new 
Configuration());
-
-               rowOrcInputFormat.openInputFormat();
-               FileInputSplit[] inputSplits = 
rowOrcInputFormat.createInputSplits(1);
-
-               Assert.assertEquals(inputSplits.length, 1);
-
-               Row row = null;
-               long count = 0;
-               for (FileInputSplit split : inputSplits) {
-                       rowOrcInputFormat.open(split);
-                       while (!rowOrcInputFormat.reachedEnd()) {
-                               row = rowOrcInputFormat.nextRecord(row);
-                               Assert.assertEquals(row.getArity(), 1);
-                               Object object = row.getField(0);
-                               long[] l = (long[]) object;
-
-                               Assert.assertEquals(l.length, 2);
-                               if (count < 50) {
-                                       Assert.assertArrayEquals(l, new 
long[]{count, count + 1});
-                               }
-                               else {
-                                       Assert.assertArrayEquals(l, new 
long[]{0L, 0L});
-                               }
-                               count = count + 2;
-                       }
-               }
-               Assert.assertEquals(count, 100);
-       }
-
-       @Test
-       public void testStringList() throws Exception {
-
-               URL testInputURL = 
getClass().getClassLoader().getResource("TestOrcFile.liststring.orc");
-               assert(testInputURL != null);
-               String path = testInputURL.getPath();
-               String schema = "struct<mylist1:array<string>>";
-
-               rowOrcInputFormat = new RowOrcInputFormat(path, schema, new 
Configuration());
-
-               rowOrcInputFormat.openInputFormat();
-               FileInputSplit[] inputSplits = 
rowOrcInputFormat.createInputSplits(1);
-
-               Assert.assertEquals(inputSplits.length, 1);
-
-               Row row = null;
-               long count = 0;
-               for (FileInputSplit split : inputSplits) {
-                       rowOrcInputFormat.open(split);
-                       while (!rowOrcInputFormat.reachedEnd()) {
-                               row = rowOrcInputFormat.nextRecord(row);
-                               Assert.assertEquals(row.getArity(), 1);
-                               Object object = row.getField(0);
-                               String[] l = (String[]) object;
-
-                               Assert.assertEquals(l.length, 2);
-                               Assert.assertArrayEquals(l, new 
String[]{"hello" + count, "hello" + (count + 1) });
-                               count = count + 2;
-                       }
-               }
-               Assert.assertEquals(count, 200);
-       }
-
-       @Test
-       public void testListOfListOfStructOfLong() throws Exception {
-               URL testInputURL = 
getClass().getClassLoader().getResource("TestOrcFile.listliststructlong.orc");
-               assert(testInputURL != null);
-               String path = testInputURL.getPath();
-               String schema = 
"struct<mylist1:array<array<struct<mylong1:bigint>>>>";
-
-               rowOrcInputFormat = new RowOrcInputFormat(path, schema, new 
Configuration());
-
-               rowOrcInputFormat.openInputFormat();
-               FileInputSplit[] inputSplits = 
rowOrcInputFormat.createInputSplits(1);
-
-               Assert.assertEquals(inputSplits.length, 1);
-
-               Row row = null;
-               long count = 0;
-               for (FileInputSplit split : inputSplits) {
-                       rowOrcInputFormat.open(split);
-                       while (!rowOrcInputFormat.reachedEnd()) {
-
-                               row = rowOrcInputFormat.nextRecord(row);
-                               Assert.assertEquals(row.getArity(), 1);
-
-                               Object[] objects = (Object[]) row.getField(0);
-                               Assert.assertEquals(objects.length, 1);
-
-                               Object[] objects1 = (Object[]) objects[0];
-                               Assert.assertEquals(objects1.length, 1);
-
-                               Row[] nestedRows = Arrays.copyOf(objects1, 
objects1.length, Row[].class);
-                               Assert.assertEquals(nestedRows.length, 1);
-
-                               Assert.assertEquals(nestedRows[0].getArity(), 
1);
-
-                               Assert.assertEquals(nestedRows[0].getField(0), 
count);
-
-                               count++;
-                       }
-               }
-               Assert.assertEquals(count, 100);
-       }
-
-       @Test
-       public void testSplit() throws IOException{
-
-               URL testInputURL = 
getClass().getClassLoader().getResource("demo-11-none.orc");
-               assert(testInputURL != null);
-               String path = testInputURL.getPath();
-               String schema = 
"struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:int," +
-                       "_col5:string,_col6:int,_col7:int,_col8:int>";
-
-               rowOrcInputFormat = new RowOrcInputFormat(path, schema, new 
Configuration());
-               rowOrcInputFormat.openInputFormat();
-
-               FileInputSplit[] inputSplits = 
rowOrcInputFormat.createInputSplits(10);
-
-               Assert.assertEquals(inputSplits.length, 10);
-
-               Row row = null;
-               int countTotalRecords = 0;
-               for (FileInputSplit split : inputSplits) {
-                       rowOrcInputFormat.open(split);
-                       int countSplitRecords = 0;
-                       while (!rowOrcInputFormat.reachedEnd()) {
-                               row = rowOrcInputFormat.nextRecord(row);
-                               countSplitRecords++;
-                       }
-                       Assert.assertNotEquals(countSplitRecords, 1920800);
-                       countTotalRecords += countSplitRecords;
-               }
-
-               Assert.assertEquals(countTotalRecords, 1920800);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/test/resources/TestOrcFile.emptyFile.orc
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.emptyFile.orc 
b/flink-connectors/flink-orc/src/test/resources/TestOrcFile.emptyFile.orc
deleted file mode 100644
index ecdadcb..0000000
Binary files 
a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.emptyFile.orc and 
/dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listliststructlong.orc
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listliststructlong.orc
 
b/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listliststructlong.orc
deleted file mode 100644
index 0f3f9c8..0000000
Binary files 
a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listliststructlong.orc
 and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listlong.orc
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listlong.orc 
b/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listlong.orc
deleted file mode 100644
index 648ea18..0000000
Binary files 
a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.listlong.orc and 
/dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/test/resources/TestOrcFile.liststring.orc
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.liststring.orc 
b/flink-connectors/flink-orc/src/test/resources/TestOrcFile.liststring.orc
deleted file mode 100644
index 75a5f2a..0000000
Binary files 
a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.liststring.orc and 
/dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/35517f12/flink-connectors/flink-orc/src/test/resources/TestOrcFile.test1.orc
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.test1.orc 
b/flink-connectors/flink-orc/src/test/resources/TestOrcFile.test1.orc
deleted file mode 100644
index 4fb0bef..0000000
Binary files 
a/flink-connectors/flink-orc/src/test/resources/TestOrcFile.test1.orc and 
/dev/null differ

Reply via email to