This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 37fbbd4  [FLINK-15348][hive] Fix orc optimization for version less 
than 2.3 by introducing orc shim
37fbbd4 is described below

commit 37fbbd4ba57d356b1877a9b7d600254550b21321
Author: JingsongLi <lzljs3620...@aliyun.com>
AuthorDate: Fri Dec 20 14:42:33 2019 +0800

    [FLINK-15348][hive] Fix orc optimization for version less than 2.3 by 
introducing orc shim
    
    add shim to support hive orc version less than 2.3
    
    this closes #10618.
---
 flink-connectors/flink-connector-hive/pom.xml      |  13 ++
 .../connectors/hive/read/HiveTableInputFormat.java |   2 +-
 .../hive/read/HiveVectorizedOrcSplitReader.java    |   2 +
 flink-formats/flink-orc/pom.xml                    |   8 +-
 .../flink/orc/OrcColumnarRowSplitReader.java       |   3 +
 .../org/apache/flink/orc/OrcRowSplitReader.java    |   3 +-
 .../java/org/apache/flink/orc/OrcSplitReader.java  | 127 +++----------
 .../org/apache/flink/orc/OrcSplitReaderUtil.java   |   3 +
 .../java/org/apache/flink/orc/shim/OrcShim.java    |  78 ++++++++
 .../org/apache/flink/orc/shim/OrcShimV200.java     | 200 +++++++++++++++++++++
 .../org/apache/flink/orc/shim/OrcShimV210.java     |  37 ++++
 .../org/apache/flink/orc/shim/OrcShimV230.java     |  57 ++++++
 .../flink/orc/OrcColumnarRowSplitReaderTest.java   |   1 +
 .../apache/flink/orc/OrcRowInputFormatTest.java    |  99 +++++-----
 pom.xml                                            |   1 +
 15 files changed, 480 insertions(+), 154 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/pom.xml 
b/flink-connectors/flink-connector-hive/pom.xml
index 7a7bfb0..16ec6ae 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -761,6 +761,19 @@ under the License.
                        <properties>
                                <hive.version>2.2.0</hive.version>
                        </properties>
+                       <dependencies>
+                               <dependency>
+                                       <groupId>org.apache.orc</groupId>
+                                       <artifactId>orc-core</artifactId>
+                                       <version>${orc.version}</version>
+                                       <exclusions>
+                                               <exclusion>
+                                                       
<groupId>org.apache.hadoop</groupId>
+                                                       
<artifactId>hadoop-common</artifactId>
+                                               </exclusion>
+                                       </exclusions>
+                               </dependency>
+                       </dependencies>
                </profile>
                <profile>
                        <id>hive-3.1.1</id>
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
index 8d28851..3e45fe3 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java
@@ -115,7 +115,7 @@ public class HiveTableInputFormat extends 
HadoopInputFormatCommonBase<BaseRow, H
                if 
(!parameters.getBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER) &&
                                
useOrcVectorizedRead(split.getHiveTablePartition())) {
                        this.reader = new HiveVectorizedOrcSplitReader(
-                                       jobConf, fieldNames, fieldTypes, 
selectedFields, split);
+                                       hiveVersion, jobConf, fieldNames, 
fieldTypes, selectedFields, split);
                } else {
                        this.reader = new HiveMapredSplitReader(jobConf, 
partitionKeys, fieldTypes, selectedFields, split,
                                        
HiveShimLoader.loadHiveShim(hiveVersion));
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveVectorizedOrcSplitReader.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveVectorizedOrcSplitReader.java
index 9dbad8a..c1aa2d1 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveVectorizedOrcSplitReader.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveVectorizedOrcSplitReader.java
@@ -43,6 +43,7 @@ public class HiveVectorizedOrcSplitReader implements 
SplitReader {
        private OrcColumnarRowSplitReader reader;
 
        public HiveVectorizedOrcSplitReader(
+                       String hiveVersion,
                        JobConf jobConf,
                        String[] fieldNames,
                        DataType[] fieldTypes,
@@ -62,6 +63,7 @@ public class HiveVectorizedOrcSplitReader implements 
SplitReader {
                }
 
                this.reader = genPartColumnarRowReader(
+                               hiveVersion,
                                conf,
                                fieldNames,
                                fieldTypes,
diff --git a/flink-formats/flink-orc/pom.xml b/flink-formats/flink-orc/pom.xml
index 4df500c..e05cee0 100644
--- a/flink-formats/flink-orc/pom.xml
+++ b/flink-formats/flink-orc/pom.xml
@@ -73,7 +73,7 @@ under the License.
                <dependency>
                        <groupId>org.apache.orc</groupId>
                        <artifactId>orc-core</artifactId>
-                       <version>1.4.3</version>
+                       <version>${orc.version}</version>
                        <exclusions>
                                <!-- Exclude ORC's Hadoop dependency and pull 
in Flink's shaded Hadoop. -->
                                <exclusion>
@@ -91,6 +91,12 @@ under the License.
                        <scope>provided</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.commons</groupId>
+                       <artifactId>commons-lang3</artifactId>
+                       <scope>provided</scope>
+               </dependency>
+
                <!-- test dependencies -->
 
                <dependency>
diff --git 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowSplitReader.java
 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowSplitReader.java
index ffdbd05..bea6331 100644
--- 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowSplitReader.java
+++ 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowSplitReader.java
@@ -19,6 +19,7 @@
 package org.apache.flink.orc;
 
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.orc.shim.OrcShim;
 import org.apache.flink.table.dataformat.BaseRow;
 import org.apache.flink.table.dataformat.ColumnarRow;
 import org.apache.flink.table.dataformat.vector.VectorizedColumnBatch;
@@ -41,6 +42,7 @@ public class OrcColumnarRowSplitReader extends 
OrcSplitReader<BaseRow> {
        private final ColumnarRow row;
 
        public OrcColumnarRowSplitReader(
+                       OrcShim shim,
                        Configuration conf,
                        TypeDescription schema,
                        int[] selectedFields,
@@ -51,6 +53,7 @@ public class OrcColumnarRowSplitReader extends 
OrcSplitReader<BaseRow> {
                        long splitStart,
                        long splitLength) throws IOException {
                super(
+                               shim,
                                conf,
                                schema,
                                selectedFields,
diff --git 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcRowSplitReader.java
 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcRowSplitReader.java
index 1f2bdf9..b4f9697 100644
--- 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcRowSplitReader.java
+++ 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcRowSplitReader.java
@@ -19,6 +19,7 @@
 package org.apache.flink.orc;
 
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.orc.shim.OrcShim;
 import org.apache.flink.types.Row;
 
 import org.apache.hadoop.conf.Configuration;
@@ -46,7 +47,7 @@ public class OrcRowSplitReader extends OrcSplitReader<Row> {
                        Path path,
                        long splitStart,
                        long splitLength) throws IOException {
-               super(conf, schema, selectedFields, conjunctPredicates, 
batchSize, path, splitStart, splitLength);
+               super(OrcShim.defaultShim(), conf, schema, selectedFields, 
conjunctPredicates, batchSize, path, splitStart, splitLength);
                this.schema = schema;
                this.selectedFields = selectedFields;
                // create and initialize the row batch
diff --git 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcSplitReader.java
 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcSplitReader.java
index e4930ae..b85bcb5 100644
--- 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcSplitReader.java
+++ 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcSplitReader.java
@@ -19,21 +19,16 @@
 package org.apache.flink.orc;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.orc.shim.OrcShim;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
-import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.orc.OrcConf;
-import org.apache.orc.OrcFile;
-import org.apache.orc.Reader;
 import org.apache.orc.RecordReader;
-import org.apache.orc.StripeInformation;
 import org.apache.orc.TypeDescription;
 
 import java.io.Closeable;
@@ -51,20 +46,21 @@ import java.util.List;
  */
 public abstract class OrcSplitReader<T> implements Closeable {
 
+       private final OrcShim shim;
+
        // the ORC reader
        private RecordReader orcRowsReader;
 
        // the vectorized row data to be read in a batch
        protected final VectorizedRowBatch rowBatch;
 
-       private final Reader.Options options;
-
        // the number of rows in the current batch
        private int rowsInBatch;
        // the index of the next row to return
        protected int nextRow;
 
        public OrcSplitReader(
+                       OrcShim shim,
                        Configuration conf,
                        TypeDescription schema,
                        int[] selectedFields,
@@ -73,47 +69,21 @@ public abstract class OrcSplitReader<T> implements 
Closeable {
                        Path path,
                        long splitStart,
                        long splitLength) throws IOException {
-               // open ORC file and create reader
-               org.apache.hadoop.fs.Path hPath = new 
org.apache.hadoop.fs.Path(path.getPath());
-               Reader orcReader = OrcFile.createReader(hPath, 
OrcFile.readerOptions(conf));
-
-               // get offset and length for the stripes that start in the split
-               Tuple2<Long, Long> offsetAndLength = getOffsetAndLengthForSplit(
-                               splitStart, splitLength, getStripes(orcReader));
-
-               // create ORC row reader configuration
-               this.options = orcReader.options()
-                               .schema(schema)
-                               .range(offsetAndLength.f0, offsetAndLength.f1)
-                               
.useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf))
-                               
.skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf))
-                               
.tolerateMissingSchema(OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf));
-
-               // configure filters
-               if (!conjunctPredicates.isEmpty()) {
-                       SearchArgument.Builder b = 
SearchArgumentFactory.newBuilder();
-                       b = b.startAnd();
-                       for (Predicate predicate : conjunctPredicates) {
-                               predicate.add(b);
-                       }
-                       b = b.end();
-                       options.searchArgument(b.build(), new String[]{});
-               }
+               this.shim = shim;
+               this.orcRowsReader = shim.createRecordReader(
+                               conf, schema, selectedFields, 
conjunctPredicates, path, splitStart, splitLength);
 
-               // configure selected fields
-               options.include(computeProjectionMask(schema, selectedFields));
-
-               // create ORC row reader
-               this.orcRowsReader = orcReader.rows(options);
-
-               // assign ids
-               schema.getId();
                // create row batch
                this.rowBatch = schema.createRowBatch(batchSize);
                rowsInBatch = 0;
                nextRow = 0;
        }
 
+       @VisibleForTesting
+       public RecordReader getRecordReader() {
+               return orcRowsReader;
+       }
+
        /**
         * Method used to check if the end of the input is reached.
         *
@@ -154,7 +124,7 @@ public abstract class OrcSplitReader<T> implements 
Closeable {
                        // No more rows available in the Rows array.
                        nextRow = 0;
                        // Try to read the next batch if rows from the ORC file.
-                       boolean moreRows = orcRowsReader.nextBatch(rowBatch);
+                       boolean moreRows = shim.nextBatch(orcRowsReader, 
rowBatch);
 
                        if (moreRows) {
                                // Load the data into the Rows array.
@@ -166,57 +136,6 @@ public abstract class OrcSplitReader<T> implements 
Closeable {
                return true;
        }
 
-       private Tuple2<Long, Long> getOffsetAndLengthForSplit(
-                       long splitStart, long splitLength, 
List<StripeInformation> stripes) {
-               long splitEnd = splitStart + splitLength;
-               long readStart = Long.MAX_VALUE;
-               long readEnd = Long.MIN_VALUE;
-
-               for (StripeInformation s : stripes) {
-                       if (splitStart <= s.getOffset() && s.getOffset() < 
splitEnd) {
-                               // stripe starts in split, so it is included
-                               readStart = Math.min(readStart, s.getOffset());
-                               readEnd = Math.max(readEnd, s.getOffset() + 
s.getLength());
-                       }
-               }
-
-               if (readStart < Long.MAX_VALUE) {
-                       // at least one split is included
-                       return Tuple2.of(readStart, readEnd - readStart);
-               } else {
-                       return Tuple2.of(0L, 0L);
-               }
-       }
-
-       @VisibleForTesting
-       Reader.Options getOptions() {
-               return options;
-       }
-
-       @VisibleForTesting
-       List<StripeInformation> getStripes(Reader orcReader) {
-               return orcReader.getStripes();
-       }
-
-       /**
-        * Computes the ORC projection mask of the fields to include from the 
selected fields.rowOrcInputFormat.nextRecord(null).
-        *
-        * @return The ORC projection mask.
-        */
-       private boolean[] computeProjectionMask(TypeDescription schema, int[] 
selectedFields) {
-               // mask with all fields of the schema
-               boolean[] projectionMask = new boolean[schema.getMaximumId() + 
1];
-               // for each selected field
-               for (int inIdx : selectedFields) {
-                       // set all nested fields of a selected field to true
-                       TypeDescription fieldSchema = 
schema.getChildren().get(inIdx);
-                       for (int i = fieldSchema.getId(); i <= 
fieldSchema.getMaximumId(); i++) {
-                               projectionMask[i] = true;
-                       }
-               }
-               return projectionMask;
-       }
-
        @Override
        public void close() throws IOException {
                if (orcRowsReader != null) {
@@ -233,7 +152,7 @@ public abstract class OrcSplitReader<T> implements 
Closeable {
         * A filter predicate that can be evaluated by the OrcInputFormat.
         */
        public abstract static class Predicate implements Serializable {
-               protected abstract SearchArgument.Builder 
add(SearchArgument.Builder builder);
+               public abstract SearchArgument.Builder 
add(SearchArgument.Builder builder);
        }
 
        abstract static class ColumnPredicate extends Predicate {
@@ -334,7 +253,7 @@ public abstract class OrcSplitReader<T> implements 
Closeable {
                }
 
                @Override
-               protected SearchArgument.Builder add(SearchArgument.Builder 
builder) {
+               public SearchArgument.Builder add(SearchArgument.Builder 
builder) {
                        return builder.equals(columnName, literalType, 
castLiteral(literal));
                }
 
@@ -360,7 +279,7 @@ public abstract class OrcSplitReader<T> implements 
Closeable {
                }
 
                @Override
-               protected SearchArgument.Builder add(SearchArgument.Builder 
builder) {
+               public SearchArgument.Builder add(SearchArgument.Builder 
builder) {
                        return builder.nullSafeEquals(columnName, literalType, 
castLiteral(literal));
                }
 
@@ -386,7 +305,7 @@ public abstract class OrcSplitReader<T> implements 
Closeable {
                }
 
                @Override
-               protected SearchArgument.Builder add(SearchArgument.Builder 
builder) {
+               public SearchArgument.Builder add(SearchArgument.Builder 
builder) {
                        return builder.lessThan(columnName, literalType, 
castLiteral(literal));
                }
 
@@ -412,7 +331,7 @@ public abstract class OrcSplitReader<T> implements 
Closeable {
                }
 
                @Override
-               protected SearchArgument.Builder add(SearchArgument.Builder 
builder) {
+               public SearchArgument.Builder add(SearchArgument.Builder 
builder) {
                        return builder.lessThanEquals(columnName, literalType, 
castLiteral(literal));
                }
 
@@ -437,7 +356,7 @@ public abstract class OrcSplitReader<T> implements 
Closeable {
                }
 
                @Override
-               protected SearchArgument.Builder add(SearchArgument.Builder 
builder) {
+               public SearchArgument.Builder add(SearchArgument.Builder 
builder) {
                        return builder.isNull(columnName, literalType);
                }
 
@@ -469,7 +388,7 @@ public abstract class OrcSplitReader<T> implements 
Closeable {
                }
 
                @Override
-               protected SearchArgument.Builder add(SearchArgument.Builder 
builder) {
+               public SearchArgument.Builder add(SearchArgument.Builder 
builder) {
                        return builder.between(columnName, literalType, 
castLiteral(lowerBound), castLiteral(upperBound));
                }
 
@@ -498,7 +417,7 @@ public abstract class OrcSplitReader<T> implements 
Closeable {
                }
 
                @Override
-               protected SearchArgument.Builder add(SearchArgument.Builder 
builder) {
+               public SearchArgument.Builder add(SearchArgument.Builder 
builder) {
                        Object[] castedLiterals = new Object[literals.length];
                        for (int i = 0; i < literals.length; i++) {
                                castedLiterals[i] = castLiteral(literals[i]);
@@ -527,7 +446,7 @@ public abstract class OrcSplitReader<T> implements 
Closeable {
                        this.pred = predicate;
                }
 
-               protected SearchArgument.Builder add(SearchArgument.Builder 
builder) {
+               public SearchArgument.Builder add(SearchArgument.Builder 
builder) {
                        return pred.add(builder.startNot()).end();
                }
 
@@ -557,7 +476,7 @@ public abstract class OrcSplitReader<T> implements 
Closeable {
                }
 
                @Override
-               protected SearchArgument.Builder add(SearchArgument.Builder 
builder) {
+               public SearchArgument.Builder add(SearchArgument.Builder 
builder) {
                        SearchArgument.Builder withOr = builder.startOr();
                        for (Predicate p : preds) {
                                withOr = p.add(withOr);
diff --git 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcSplitReaderUtil.java
 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcSplitReaderUtil.java
index 550e5d2..f3f6e91 100644
--- 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcSplitReaderUtil.java
+++ 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcSplitReaderUtil.java
@@ -20,6 +20,7 @@ package org.apache.flink.orc;
 
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.orc.OrcColumnarRowSplitReader.ColumnBatchGenerator;
+import org.apache.flink.orc.shim.OrcShim;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.dataformat.vector.ColumnVector;
 import org.apache.flink.table.dataformat.vector.VectorizedColumnBatch;
@@ -54,6 +55,7 @@ public class OrcSplitReaderUtil {
         * Util for generating partitioned {@link OrcColumnarRowSplitReader}.
         */
        public static OrcColumnarRowSplitReader genPartColumnarRowReader(
+                       String hiveVersion,
                        Configuration conf,
                        String[] fullFieldNames,
                        DataType[] fullFieldTypes,
@@ -89,6 +91,7 @@ public class OrcSplitReaderUtil {
                };
 
                return new OrcColumnarRowSplitReader(
+                               OrcShim.createShim(hiveVersion),
                                conf,
                                convertToOrcTypeWithPart(fullFieldNames, 
fullFieldTypes, partitionSpec.keySet()),
                                selectedOrcFields,
diff --git 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShim.java 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShim.java
new file mode 100644
index 0000000..ca4fd2e
--- /dev/null
+++ 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShim.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.shim;
+
+import org.apache.flink.orc.OrcSplitReader;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * A shim layer to support orc with different dependents versions of Hive.
+ */
+public interface OrcShim extends Serializable {
+
+       /**
+        * Create orc {@link RecordReader} from conf, schema and etc...
+        */
+       RecordReader createRecordReader(
+                       Configuration conf,
+                       TypeDescription schema,
+                       int[] selectedFields,
+                       List<OrcSplitReader.Predicate> conjunctPredicates,
+                       org.apache.flink.core.fs.Path path,
+                       long splitStart,
+                       long splitLength) throws IOException;
+
+       /**
+        * Read the next row batch.
+        */
+       boolean nextBatch(RecordReader reader, VectorizedRowBatch rowBatch) 
throws IOException;
+
+       /**
+        * Default with orc dependent, we should use v2.3.0.
+        */
+       static OrcShim defaultShim() {
+               return new OrcShimV230();
+       }
+
+       /**
+        * Create shim from hive version.
+        */
+       static OrcShim createShim(String hiveVersion) {
+               if (hiveVersion.startsWith("2.0")) {
+                       return new OrcShimV200();
+               } else if (hiveVersion.startsWith("2.1")) {
+                       return new OrcShimV210();
+               } else if (hiveVersion.startsWith("2.2") ||
+                               hiveVersion.startsWith("2.3") ||
+                               hiveVersion.startsWith("3.")) {
+                       return new OrcShimV230();
+               } else {
+                       throw new UnsupportedOperationException(
+                                       "Unsupported hive version for orc shim: 
" + hiveVersion);
+               }
+       }
+}
diff --git 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShimV200.java
 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShimV200.java
new file mode 100644
index 0000000..3e59a75
--- /dev/null
+++ 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShimV200.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.shim;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.orc.OrcSplitReader.Predicate;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.orc.OrcConf;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+
+import static 
org.apache.commons.lang3.reflect.ConstructorUtils.invokeConstructor;
+import static org.apache.commons.lang3.reflect.MethodUtils.invokeExactMethod;
+import static org.apache.commons.lang3.reflect.MethodUtils.invokeStaticMethod;
+
+/**
+ * Shim orc for Hive version 2.0.0 and upper versions.
+ */
+public class OrcShimV200 implements OrcShim {
+
+       private static final long serialVersionUID = 1L;
+
+       private transient Method hasNextMethod;
+       private transient Method nextBatchMethod;
+
+       protected Reader createReader(Path path, Configuration conf) throws 
IOException {
+               try {
+                       Class orcFileClass = 
Class.forName("org.apache.hadoop.hive.ql.io.orc.OrcFile");
+                       Object readerOptions = invokeStaticMethod(orcFileClass, 
"readerOptions", conf);
+
+                       Class readerClass = 
Class.forName("org.apache.hadoop.hive.ql.io.orc.ReaderImpl");
+                       //noinspection unchecked
+                       return (Reader) invokeConstructor(readerClass, path, 
readerOptions);
+               } catch (ClassNotFoundException |
+                               NoSuchMethodException |
+                               IllegalAccessException |
+                               InstantiationException |
+                               InvocationTargetException e) {
+                       throw new IOException(e);
+               }
+       }
+
+       protected RecordReader createRecordReader(Reader reader, Reader.Options 
options) throws IOException {
+               try {
+                       return (RecordReader) invokeExactMethod(reader, 
"rowsOptions", options);
+               } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException e) {
+                       throw new IOException(e);
+               }
+       }
+
+       protected Reader.Options readOrcConf(Reader.Options options, 
Configuration conf) {
+               return 
options.useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf))
+                               
.skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf));
+       }
+
+       @Override
+       public RecordReader createRecordReader(
+                       Configuration conf,
+                       TypeDescription schema,
+                       int[] selectedFields,
+                       List<Predicate> conjunctPredicates,
+                       org.apache.flink.core.fs.Path path,
+                       long splitStart,
+                       long splitLength) throws IOException {
+               // open ORC file and create reader
+               Path hPath = new Path(path.toUri());
+
+               Reader orcReader = createReader(hPath, conf);
+
+               // get offset and length for the stripes that start in the split
+               Tuple2<Long, Long> offsetAndLength = getOffsetAndLengthForSplit(
+                               splitStart, splitLength, 
orcReader.getStripes());
+
+               // create ORC row reader configuration
+               Reader.Options options = readOrcConf(
+                               new 
Reader.Options().schema(schema).range(offsetAndLength.f0, offsetAndLength.f1),
+                               conf);
+
+               // configure filters
+               if (!conjunctPredicates.isEmpty()) {
+                       SearchArgument.Builder b = 
SearchArgumentFactory.newBuilder();
+                       b = b.startAnd();
+                       for (Predicate predicate : conjunctPredicates) {
+                               predicate.add(b);
+                       }
+                       b = b.end();
+                       options.searchArgument(b.build(), new String[]{});
+               }
+
+               // configure selected fields
+               options.include(computeProjectionMask(schema, selectedFields));
+
+               // create ORC row reader
+               RecordReader orcRowsReader = createRecordReader(orcReader, 
options);
+
+               // assign ids
+               schema.getId();
+
+               return orcRowsReader;
+       }
+
+       @Override
+       public boolean nextBatch(RecordReader reader, VectorizedRowBatch 
rowBatch) throws IOException {
+               try {
+                       if (hasNextMethod == null) {
+                               hasNextMethod = 
Class.forName("org.apache.hadoop.hive.ql.io.orc.RecordReader")
+                                               .getMethod("hasNext");
+                               hasNextMethod.setAccessible(true);
+                       }
+                       if (nextBatchMethod == null) {
+                               nextBatchMethod = 
RecordReader.class.getMethod("nextBatch", VectorizedRowBatch.class);
+                               nextBatchMethod.setAccessible(true);
+                       }
+                       boolean hasNext = (boolean) 
hasNextMethod.invoke(reader);
+                       if (hasNext) {
+                               nextBatchMethod.invoke(reader, rowBatch);
+                               return true;
+                       } else {
+                               return false;
+                       }
+               } catch (IllegalAccessException |
+                               InvocationTargetException |
+                               NoSuchMethodException |
+                               ClassNotFoundException e) {
+                       throw new IOException(e);
+               }
+       }
+
+       @VisibleForTesting
+       public static Tuple2<Long, Long> getOffsetAndLengthForSplit(
+                       long splitStart, long splitLength, 
List<StripeInformation> stripes) {
+               long splitEnd = splitStart + splitLength;
+               long readStart = Long.MAX_VALUE;
+               long readEnd = Long.MIN_VALUE;
+
+               for (StripeInformation s : stripes) {
+                       if (splitStart <= s.getOffset() && s.getOffset() < 
splitEnd) {
+                               // stripe starts in split, so it is included
+                               readStart = Math.min(readStart, s.getOffset());
+                               readEnd = Math.max(readEnd, s.getOffset() + 
s.getLength());
+                       }
+               }
+
+               if (readStart < Long.MAX_VALUE) {
+                       // at least one split is included
+                       return Tuple2.of(readStart, readEnd - readStart);
+               } else {
+                       return Tuple2.of(0L, 0L);
+               }
+       }
+
+       /**
+        * Computes the ORC projection mask of the fields to include from the 
selected fields.rowOrcInputFormat.nextRecord(null).
+        *
+        * @return The ORC projection mask.
+        */
+       private static boolean[] computeProjectionMask(TypeDescription schema, 
int[] selectedFields) {
+               // mask with all fields of the schema
+               boolean[] projectionMask = new boolean[schema.getMaximumId() + 
1];
+               // for each selected field
+               for (int inIdx : selectedFields) {
+                       // set all nested fields of a selected field to true
+                       TypeDescription fieldSchema = 
schema.getChildren().get(inIdx);
+                       for (int i = fieldSchema.getId(); i <= 
fieldSchema.getMaximumId(); i++) {
+                               projectionMask[i] = true;
+                       }
+               }
+               return projectionMask;
+       }
+
+}
diff --git 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShimV210.java
 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShimV210.java
new file mode 100644
index 0000000..90be3bb
--- /dev/null
+++ 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShimV210.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.shim;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.RecordReader;
+
+import java.io.IOException;
+
+/**
+ * Shim orc for Hive version 2.1.0 and upper versions.
+ */
+public class OrcShimV210 extends OrcShimV200 {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       public boolean nextBatch(RecordReader reader, VectorizedRowBatch 
rowBatch) throws IOException {
+               return reader.nextBatch(rowBatch);
+       }
+}
diff --git 
a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShimV230.java
 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShimV230.java
new file mode 100644
index 0000000..6df366e
--- /dev/null
+++ 
b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShimV230.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.shim;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.orc.OrcConf;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+
+import java.io.IOException;
+
+/**
+ * Shim orc for Hive version 2.1.0 and upper versions.
+ *
+ * <p>After hive 2.3 and later, the orc API is basically stable, so we can 
call it directly.
+ *
+ * <p>Since hive 2.2 not include orc classes, so we can use hive orc 2.3 to 
read from hive 2.2.
+ */
+public class OrcShimV230 extends OrcShimV210 {
+
+       private static final long serialVersionUID = 1L;
+
+       @Override
+       protected Reader createReader(Path path,
+                       Configuration conf) throws IOException {
+               return OrcFile.createReader(path, OrcFile.readerOptions(conf));
+       }
+
+       @Override
+       protected RecordReader createRecordReader(Reader reader, Reader.Options 
options) throws IOException {
+               return reader.rows(options);
+       }
+
+       @Override
+       protected Reader.Options readOrcConf(Reader.Options options, 
Configuration conf) {
+               return super.readOrcConf(options, conf)
+                               
.tolerateMissingSchema(OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf));
+       }
+}
diff --git 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java
 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java
index cf93206..83513ba 100644
--- 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java
+++ 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java
@@ -402,6 +402,7 @@ public class OrcColumnarRowSplitReaderTest {
                        Map<String, Object> partitionSpec,
                        FileInputSplit split) throws IOException {
                return OrcSplitReaderUtil.genPartColumnarRowReader(
+                               "2.3.0",
                                new Configuration(),
                                IntStream.range(0, fullTypes.length)
                                                .mapToObj(i -> "f" + 
i).toArray(String[]::new),
diff --git 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
index f85d964..9481c72 100644
--- 
a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
+++ 
b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
@@ -21,22 +21,22 @@ package org.apache.flink.orc;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.MapTypeInfo;
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.InstantiationUtil;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
-import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
 import org.apache.orc.StripeInformation;
-import org.apache.orc.TypeDescription;
+import org.apache.orc.impl.RecordReaderImpl;
+import org.apache.orc.impl.SchemaEvolution;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.FileNotFoundException;
@@ -47,8 +47,9 @@ import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
-import java.util.stream.IntStream;
 
+import static org.apache.commons.lang3.reflect.FieldUtils.readDeclaredField;
+import static org.apache.flink.orc.shim.OrcShimV200.getOffsetAndLengthForSplit;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -176,7 +177,7 @@ public class OrcRowInputFormatTest {
        }
 
        @Test
-       public void testProjectionMaskNested() throws IOException{
+       public void testProjectionMaskNested() throws Exception {
                rowOrcInputFormat =
                        new OrcRowInputFormat(getPath(TEST_FILE_NESTED), 
TEST_SCHEMA_NESTED, new Configuration());
 
@@ -196,11 +197,16 @@ public class OrcRowInputFormatTest {
                        true, true, true, true, true, // nested field 9 is in
                        false, false, false, false, // nested field 10 is out
                        true, true, true, true, true}; // nested field 11 is in
-               assertArrayEquals(expected, 
spy.getReader().getOptions().getInclude());
+               assertArrayEquals(expected, 
getInclude(spy.getReader().getRecordReader()));
+       }
+
+       private static boolean[] getInclude(RecordReader reader) throws 
IllegalAccessException {
+               SchemaEvolution evolution = (SchemaEvolution) 
readDeclaredField(reader, "evolution", true);
+               return evolution.getReaderIncluded();
        }
 
        @Test
-       public void testSplitStripesGivenSplits() throws IOException {
+       public void testSplitStripesGivenSplits() throws Exception {
                rowOrcInputFormat =
                        new OrcRowInputFormat(getPath(TEST_FILE_FLAT), 
TEST_SCHEMA_FLAT, new Configuration());
 
@@ -210,37 +216,34 @@ public class OrcRowInputFormatTest {
 
                spy.openInputFormat();
                spy.open(splits[0]);
-               assertOptions(spy.getReader(), 3L, 137005L);
+               assertOffsetAndLen(spy.getReader(), 3L, 137005L);
                spy.open(splits[1]);
-               assertOptions(spy.getReader(), 137008L, 136182L);
+               assertOffsetAndLen(spy.getReader(), 137008L, 136182L);
                spy.open(splits[2]);
-               assertOptions(spy.getReader(), 273190L, 123633L);
+               assertOffsetAndLen(spy.getReader(), 273190L, 123633L);
        }
 
-       private static void assertOptions(OrcSplitReader reader, long offset, 
long length) {
-               Reader.Options options = reader.getOptions();
-               Assert.assertEquals(offset, options.getOffset());
-               Assert.assertEquals(length, options.getLength());
+       @SuppressWarnings("unchecked")
+       private static List<StripeInformation> getStripes(RecordReader reader) 
throws IllegalAccessException {
+               return (List<StripeInformation>) readDeclaredField(reader, 
"stripes", true);
        }
 
-       private static OrcRowSplitReader newReaderWithStripes(
-                       List<StripeInformation> stripes, FileInputSplit split) 
throws IOException {
-               TypeDescription td = 
TypeDescription.fromString(TEST_SCHEMA_FLAT);
-               return new OrcRowSplitReader(
-                               new Configuration(),
-                               td,
-                               IntStream.range(0, 
td.getChildren().size()).toArray(),
-                               new ArrayList<>(),
-                               1024,
-                               split.getPath(),
-                               split.getStart(),
-                               split.getLength()) {
-
-                       @Override
-                       List<StripeInformation> getStripes(Reader orcReader) {
-                               return stripes;
+       private static void assertOffsetAndLen(
+                       OrcSplitReader reader, long offset, long length) throws 
IllegalAccessException {
+               List<StripeInformation> stripes = 
getStripes(reader.getRecordReader());
+               long min = Long.MAX_VALUE;
+               long max = Long.MIN_VALUE;
+               for (StripeInformation stripe : stripes) {
+                       if (stripe.getOffset() < min) {
+                               min = stripe.getOffset();
                        }
-               };
+                       if (stripe.getOffset() + stripe.getLength() > max) {
+                               max = stripe.getOffset() + stripe.getLength();
+                       }
+               }
+
+               assertEquals(offset, min);
+               assertEquals(length, max - min);
        }
 
        @Test
@@ -269,21 +272,17 @@ public class OrcRowInputFormatTest {
                stripes.add(stripe5);
 
                // split ranging 2 stripes
-               OrcRowSplitReader reader = newReaderWithStripes(
-                               stripes, new FileInputSplit(0, new 
Path(getPath(TEST_FILE_FLAT)), 0, 150, new String[]{}));
-               assertOptions(reader, 10L, 190L);
+               assertEquals(new Tuple2<>(10L, 190L), 
getOffsetAndLengthForSplit(0, 150, stripes));
+
                // split ranging 0 stripes
-               reader = newReaderWithStripes(
-                               stripes, new FileInputSplit(1, new 
Path(getPath(TEST_FILE_FLAT)), 150, 10, new String[]{}));
-               assertOptions(reader, 0L, 0L);
+               assertEquals(new Tuple2<>(0L, 0L), 
getOffsetAndLengthForSplit(150, 10, stripes));
+
                // split ranging 1 stripe
-               reader = newReaderWithStripes(
-                               stripes, new FileInputSplit(2, new 
Path(getPath(TEST_FILE_FLAT)), 160, 41, new String[]{}));
-               assertOptions(reader, 200L, 100L);
+               assertEquals(new Tuple2<>(200L, 100L), 
getOffsetAndLengthForSplit(160, 41, stripes));
+
                // split ranging 2 stripe
-               reader = newReaderWithStripes(
-                               stripes, new FileInputSplit(3, new 
Path(getPath(TEST_FILE_FLAT)), 201, 299, new String[]{}));
-               assertOptions(reader, 300L, 200L);
+               assertEquals(new Tuple2<>(300L, 200L), 
getOffsetAndLengthForSplit(201, 299, stripes));
+
        }
 
        @Test
@@ -421,7 +420,7 @@ public class OrcRowInputFormatTest {
                spy.open(splits[0]);
 
                // verify predicate configuration
-               SearchArgument sarg = 
spy.getReader().getOptions().getSearchArgument();
+               SearchArgument sarg = 
getSearchArgument(spy.getReader().getRecordReader());
                assertNotNull(sarg);
                assertEquals("(and leaf-0 leaf-1 leaf-2 leaf-3 leaf-4 leaf-5 
leaf-6 leaf-7 leaf-8)", sarg.getExpression().toString());
                assertEquals(9, sarg.getLeaves().size());
@@ -437,6 +436,12 @@ public class OrcRowInputFormatTest {
                assertEquals("(EQUALS string1 hello)", 
leaves.get(8).toString());
        }
 
+       private static SearchArgument getSearchArgument(RecordReader reader) 
throws IllegalAccessException {
+               RecordReaderImpl.SargApplier applier =
+                               (RecordReaderImpl.SargApplier) 
readDeclaredField(reader, "sargApp", true);
+               return (SearchArgument) readDeclaredField(applier, "sarg", 
true);
+       }
+
        @Test
        public void testTimePredicates() throws Exception {
                rowOrcInputFormat =
@@ -461,7 +466,7 @@ public class OrcRowInputFormatTest {
                spy.open(splits[0]);
 
                // verify predicate configuration
-               SearchArgument sarg = 
spy.getReader().getOptions().getSearchArgument();
+               SearchArgument sarg = 
getSearchArgument(spy.getReader().getRecordReader());
                assertNotNull(sarg);
                assertEquals("(or leaf-0 leaf-1)", 
sarg.getExpression().toString());
                assertEquals(2, sarg.getLeaves().size());
@@ -490,7 +495,7 @@ public class OrcRowInputFormatTest {
                spy.open(splits[0]);
 
                // verify predicate configuration
-               SearchArgument sarg = 
spy.getReader().getOptions().getSearchArgument();
+               SearchArgument sarg = 
getSearchArgument(spy.getReader().getRecordReader());
                assertNotNull(sarg);
                assertEquals("(not leaf-0)", sarg.getExpression().toString());
                assertEquals(1, sarg.getLeaves().size());
diff --git a/pom.xml b/pom.xml
index 20e48b2..c4343e8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -146,6 +146,7 @@ under the License.
                <minikdc.version>3.2.0</minikdc.version>
                
<generated.docs.dir>./docs/_includes/generated</generated.docs.dir>
                <hive.version>2.3.4</hive.version>
+               <orc.version>1.4.3</orc.version>
                <!--
                        Hive 2.3.4 relies on Hadoop 2.7.2 and later versions.
                        For Hadoop 2.7, the minor Hadoop version supported for 
flink-shaded-hadoop-2-uber is 2.7.5

Reply via email to