This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push: new 37fbbd4 [FLINK-15348][hive] Fix orc optimization for version less than 2.3 by introducing orc shim 37fbbd4 is described below commit 37fbbd4ba57d356b1877a9b7d600254550b21321 Author: JingsongLi <lzljs3620...@aliyun.com> AuthorDate: Fri Dec 20 14:42:33 2019 +0800 [FLINK-15348][hive] Fix orc optimization for version less than 2.3 by introducing orc shim add shim to support hive orc version less than 2.3 this closes #10618. --- flink-connectors/flink-connector-hive/pom.xml | 13 ++ .../connectors/hive/read/HiveTableInputFormat.java | 2 +- .../hive/read/HiveVectorizedOrcSplitReader.java | 2 + flink-formats/flink-orc/pom.xml | 8 +- .../flink/orc/OrcColumnarRowSplitReader.java | 3 + .../org/apache/flink/orc/OrcRowSplitReader.java | 3 +- .../java/org/apache/flink/orc/OrcSplitReader.java | 127 +++---------- .../org/apache/flink/orc/OrcSplitReaderUtil.java | 3 + .../java/org/apache/flink/orc/shim/OrcShim.java | 78 ++++++++ .../org/apache/flink/orc/shim/OrcShimV200.java | 200 +++++++++++++++++++++ .../org/apache/flink/orc/shim/OrcShimV210.java | 37 ++++ .../org/apache/flink/orc/shim/OrcShimV230.java | 57 ++++++ .../flink/orc/OrcColumnarRowSplitReaderTest.java | 1 + .../apache/flink/orc/OrcRowInputFormatTest.java | 99 +++++----- pom.xml | 1 + 15 files changed, 480 insertions(+), 154 deletions(-) diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml index 7a7bfb0..16ec6ae 100644 --- a/flink-connectors/flink-connector-hive/pom.xml +++ b/flink-connectors/flink-connector-hive/pom.xml @@ -761,6 +761,19 @@ under the License. <properties> <hive.version>2.2.0</hive.version> </properties> + <dependencies> + <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-core</artifactId> + <version>${orc.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> </profile> <profile> <id>hive-3.1.1</id> diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java index 8d28851..3e45fe3 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveTableInputFormat.java @@ -115,7 +115,7 @@ public class HiveTableInputFormat extends HadoopInputFormatCommonBase<BaseRow, H if (!parameters.getBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER) && useOrcVectorizedRead(split.getHiveTablePartition())) { this.reader = new HiveVectorizedOrcSplitReader( - jobConf, fieldNames, fieldTypes, selectedFields, split); + hiveVersion, jobConf, fieldNames, fieldTypes, selectedFields, split); } else { this.reader = new HiveMapredSplitReader(jobConf, partitionKeys, fieldTypes, selectedFields, split, HiveShimLoader.loadHiveShim(hiveVersion)); diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveVectorizedOrcSplitReader.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveVectorizedOrcSplitReader.java index 9dbad8a..c1aa2d1 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveVectorizedOrcSplitReader.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveVectorizedOrcSplitReader.java @@ -43,6 +43,7 @@ public class HiveVectorizedOrcSplitReader implements SplitReader { private OrcColumnarRowSplitReader reader; public HiveVectorizedOrcSplitReader( + String hiveVersion, JobConf jobConf, String[] fieldNames, DataType[] fieldTypes, @@ -62,6 +63,7 @@ public class HiveVectorizedOrcSplitReader implements SplitReader { } this.reader = genPartColumnarRowReader( + hiveVersion, conf, fieldNames, fieldTypes, diff --git a/flink-formats/flink-orc/pom.xml b/flink-formats/flink-orc/pom.xml index 4df500c..e05cee0 100644 --- a/flink-formats/flink-orc/pom.xml +++ b/flink-formats/flink-orc/pom.xml @@ -73,7 +73,7 @@ under the License. <dependency> <groupId>org.apache.orc</groupId> <artifactId>orc-core</artifactId> - <version>1.4.3</version> + <version>${orc.version}</version> <exclusions> <!-- Exclude ORC's Hadoop dependency and pull in Flink's shaded Hadoop. --> <exclusion> @@ -91,6 +91,12 @@ under the License. <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <scope>provided</scope> + </dependency> + <!-- test dependencies --> <dependency> diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowSplitReader.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowSplitReader.java index ffdbd05..bea6331 100644 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowSplitReader.java +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcColumnarRowSplitReader.java @@ -19,6 +19,7 @@ package org.apache.flink.orc; import org.apache.flink.core.fs.Path; +import org.apache.flink.orc.shim.OrcShim; import org.apache.flink.table.dataformat.BaseRow; import org.apache.flink.table.dataformat.ColumnarRow; import org.apache.flink.table.dataformat.vector.VectorizedColumnBatch; @@ -41,6 +42,7 @@ public class OrcColumnarRowSplitReader extends OrcSplitReader<BaseRow> { private final ColumnarRow row; public OrcColumnarRowSplitReader( + OrcShim shim, Configuration conf, TypeDescription schema, int[] selectedFields, @@ -51,6 +53,7 @@ public class OrcColumnarRowSplitReader extends OrcSplitReader<BaseRow> { long splitStart, long splitLength) throws IOException { super( + shim, conf, schema, selectedFields, diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcRowSplitReader.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcRowSplitReader.java index 1f2bdf9..b4f9697 100644 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcRowSplitReader.java +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcRowSplitReader.java @@ -19,6 +19,7 @@ package org.apache.flink.orc; import org.apache.flink.core.fs.Path; +import org.apache.flink.orc.shim.OrcShim; import org.apache.flink.types.Row; import org.apache.hadoop.conf.Configuration; @@ -46,7 +47,7 @@ public class OrcRowSplitReader extends OrcSplitReader<Row> { Path path, long splitStart, long splitLength) throws IOException { - super(conf, schema, selectedFields, conjunctPredicates, batchSize, path, splitStart, splitLength); + super(OrcShim.defaultShim(), conf, schema, selectedFields, conjunctPredicates, batchSize, path, splitStart, splitLength); this.schema = schema; this.selectedFields = selectedFields; // create and initialize the row batch diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcSplitReader.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcSplitReader.java index e4930ae..b85bcb5 100644 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcSplitReader.java +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcSplitReader.java @@ -19,21 +19,16 @@ package org.apache.flink.orc; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.Path; +import org.apache.flink.orc.shim.OrcShim; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; -import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; -import org.apache.orc.OrcConf; -import org.apache.orc.OrcFile; -import org.apache.orc.Reader; import org.apache.orc.RecordReader; -import org.apache.orc.StripeInformation; import org.apache.orc.TypeDescription; import java.io.Closeable; @@ -51,20 +46,21 @@ import java.util.List; */ public abstract class OrcSplitReader<T> implements Closeable { + private final OrcShim shim; + // the ORC reader private RecordReader orcRowsReader; // the vectorized row data to be read in a batch protected final VectorizedRowBatch rowBatch; - private final Reader.Options options; - // the number of rows in the current batch private int rowsInBatch; // the index of the next row to return protected int nextRow; public OrcSplitReader( + OrcShim shim, Configuration conf, TypeDescription schema, int[] selectedFields, @@ -73,47 +69,21 @@ public abstract class OrcSplitReader<T> implements Closeable { Path path, long splitStart, long splitLength) throws IOException { - // open ORC file and create reader - org.apache.hadoop.fs.Path hPath = new org.apache.hadoop.fs.Path(path.getPath()); - Reader orcReader = OrcFile.createReader(hPath, OrcFile.readerOptions(conf)); - - // get offset and length for the stripes that start in the split - Tuple2<Long, Long> offsetAndLength = getOffsetAndLengthForSplit( - splitStart, splitLength, getStripes(orcReader)); - - // create ORC row reader configuration - this.options = orcReader.options() - .schema(schema) - .range(offsetAndLength.f0, offsetAndLength.f1) - .useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf)) - .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf)) - .tolerateMissingSchema(OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf)); - - // configure filters - if (!conjunctPredicates.isEmpty()) { - SearchArgument.Builder b = SearchArgumentFactory.newBuilder(); - b = b.startAnd(); - for (Predicate predicate : conjunctPredicates) { - predicate.add(b); - } - b = b.end(); - options.searchArgument(b.build(), new String[]{}); - } + this.shim = shim; + this.orcRowsReader = shim.createRecordReader( + conf, schema, selectedFields, conjunctPredicates, path, splitStart, splitLength); - // configure selected fields - options.include(computeProjectionMask(schema, selectedFields)); - - // create ORC row reader - this.orcRowsReader = orcReader.rows(options); - - // assign ids - schema.getId(); // create row batch this.rowBatch = schema.createRowBatch(batchSize); rowsInBatch = 0; nextRow = 0; } + @VisibleForTesting + public RecordReader getRecordReader() { + return orcRowsReader; + } + /** * Method used to check if the end of the input is reached. * @@ -154,7 +124,7 @@ public abstract class OrcSplitReader<T> implements Closeable { // No more rows available in the Rows array. nextRow = 0; // Try to read the next batch if rows from the ORC file. - boolean moreRows = orcRowsReader.nextBatch(rowBatch); + boolean moreRows = shim.nextBatch(orcRowsReader, rowBatch); if (moreRows) { // Load the data into the Rows array. @@ -166,57 +136,6 @@ public abstract class OrcSplitReader<T> implements Closeable { return true; } - private Tuple2<Long, Long> getOffsetAndLengthForSplit( - long splitStart, long splitLength, List<StripeInformation> stripes) { - long splitEnd = splitStart + splitLength; - long readStart = Long.MAX_VALUE; - long readEnd = Long.MIN_VALUE; - - for (StripeInformation s : stripes) { - if (splitStart <= s.getOffset() && s.getOffset() < splitEnd) { - // stripe starts in split, so it is included - readStart = Math.min(readStart, s.getOffset()); - readEnd = Math.max(readEnd, s.getOffset() + s.getLength()); - } - } - - if (readStart < Long.MAX_VALUE) { - // at least one split is included - return Tuple2.of(readStart, readEnd - readStart); - } else { - return Tuple2.of(0L, 0L); - } - } - - @VisibleForTesting - Reader.Options getOptions() { - return options; - } - - @VisibleForTesting - List<StripeInformation> getStripes(Reader orcReader) { - return orcReader.getStripes(); - } - - /** - * Computes the ORC projection mask of the fields to include from the selected fields.rowOrcInputFormat.nextRecord(null). - * - * @return The ORC projection mask. - */ - private boolean[] computeProjectionMask(TypeDescription schema, int[] selectedFields) { - // mask with all fields of the schema - boolean[] projectionMask = new boolean[schema.getMaximumId() + 1]; - // for each selected field - for (int inIdx : selectedFields) { - // set all nested fields of a selected field to true - TypeDescription fieldSchema = schema.getChildren().get(inIdx); - for (int i = fieldSchema.getId(); i <= fieldSchema.getMaximumId(); i++) { - projectionMask[i] = true; - } - } - return projectionMask; - } - @Override public void close() throws IOException { if (orcRowsReader != null) { @@ -233,7 +152,7 @@ public abstract class OrcSplitReader<T> implements Closeable { * A filter predicate that can be evaluated by the OrcInputFormat. */ public abstract static class Predicate implements Serializable { - protected abstract SearchArgument.Builder add(SearchArgument.Builder builder); + public abstract SearchArgument.Builder add(SearchArgument.Builder builder); } abstract static class ColumnPredicate extends Predicate { @@ -334,7 +253,7 @@ public abstract class OrcSplitReader<T> implements Closeable { } @Override - protected SearchArgument.Builder add(SearchArgument.Builder builder) { + public SearchArgument.Builder add(SearchArgument.Builder builder) { return builder.equals(columnName, literalType, castLiteral(literal)); } @@ -360,7 +279,7 @@ public abstract class OrcSplitReader<T> implements Closeable { } @Override - protected SearchArgument.Builder add(SearchArgument.Builder builder) { + public SearchArgument.Builder add(SearchArgument.Builder builder) { return builder.nullSafeEquals(columnName, literalType, castLiteral(literal)); } @@ -386,7 +305,7 @@ public abstract class OrcSplitReader<T> implements Closeable { } @Override - protected SearchArgument.Builder add(SearchArgument.Builder builder) { + public SearchArgument.Builder add(SearchArgument.Builder builder) { return builder.lessThan(columnName, literalType, castLiteral(literal)); } @@ -412,7 +331,7 @@ public abstract class OrcSplitReader<T> implements Closeable { } @Override - protected SearchArgument.Builder add(SearchArgument.Builder builder) { + public SearchArgument.Builder add(SearchArgument.Builder builder) { return builder.lessThanEquals(columnName, literalType, castLiteral(literal)); } @@ -437,7 +356,7 @@ public abstract class OrcSplitReader<T> implements Closeable { } @Override - protected SearchArgument.Builder add(SearchArgument.Builder builder) { + public SearchArgument.Builder add(SearchArgument.Builder builder) { return builder.isNull(columnName, literalType); } @@ -469,7 +388,7 @@ public abstract class OrcSplitReader<T> implements Closeable { } @Override - protected SearchArgument.Builder add(SearchArgument.Builder builder) { + public SearchArgument.Builder add(SearchArgument.Builder builder) { return builder.between(columnName, literalType, castLiteral(lowerBound), castLiteral(upperBound)); } @@ -498,7 +417,7 @@ public abstract class OrcSplitReader<T> implements Closeable { } @Override - protected SearchArgument.Builder add(SearchArgument.Builder builder) { + public SearchArgument.Builder add(SearchArgument.Builder builder) { Object[] castedLiterals = new Object[literals.length]; for (int i = 0; i < literals.length; i++) { castedLiterals[i] = castLiteral(literals[i]); @@ -527,7 +446,7 @@ public abstract class OrcSplitReader<T> implements Closeable { this.pred = predicate; } - protected SearchArgument.Builder add(SearchArgument.Builder builder) { + public SearchArgument.Builder add(SearchArgument.Builder builder) { return pred.add(builder.startNot()).end(); } @@ -557,7 +476,7 @@ public abstract class OrcSplitReader<T> implements Closeable { } @Override - protected SearchArgument.Builder add(SearchArgument.Builder builder) { + public SearchArgument.Builder add(SearchArgument.Builder builder) { SearchArgument.Builder withOr = builder.startOr(); for (Predicate p : preds) { withOr = p.add(withOr); diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcSplitReaderUtil.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcSplitReaderUtil.java index 550e5d2..f3f6e91 100644 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcSplitReaderUtil.java +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcSplitReaderUtil.java @@ -20,6 +20,7 @@ package org.apache.flink.orc; import org.apache.flink.core.fs.Path; import org.apache.flink.orc.OrcColumnarRowSplitReader.ColumnBatchGenerator; +import org.apache.flink.orc.shim.OrcShim; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.dataformat.vector.ColumnVector; import org.apache.flink.table.dataformat.vector.VectorizedColumnBatch; @@ -54,6 +55,7 @@ public class OrcSplitReaderUtil { * Util for generating partitioned {@link OrcColumnarRowSplitReader}. */ public static OrcColumnarRowSplitReader genPartColumnarRowReader( + String hiveVersion, Configuration conf, String[] fullFieldNames, DataType[] fullFieldTypes, @@ -89,6 +91,7 @@ public class OrcSplitReaderUtil { }; return new OrcColumnarRowSplitReader( + OrcShim.createShim(hiveVersion), conf, convertToOrcTypeWithPart(fullFieldNames, fullFieldTypes, partitionSpec.keySet()), selectedOrcFields, diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShim.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShim.java new file mode 100644 index 0000000..ca4fd2e --- /dev/null +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShim.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc.shim; + +import org.apache.flink.orc.OrcSplitReader; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.RecordReader; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +/** + * A shim layer to support orc with different dependents versions of Hive. + */ +public interface OrcShim extends Serializable { + + /** + * Create orc {@link RecordReader} from conf, schema and etc... + */ + RecordReader createRecordReader( + Configuration conf, + TypeDescription schema, + int[] selectedFields, + List<OrcSplitReader.Predicate> conjunctPredicates, + org.apache.flink.core.fs.Path path, + long splitStart, + long splitLength) throws IOException; + + /** + * Read the next row batch. + */ + boolean nextBatch(RecordReader reader, VectorizedRowBatch rowBatch) throws IOException; + + /** + * Default with orc dependent, we should use v2.3.0. + */ + static OrcShim defaultShim() { + return new OrcShimV230(); + } + + /** + * Create shim from hive version. + */ + static OrcShim createShim(String hiveVersion) { + if (hiveVersion.startsWith("2.0")) { + return new OrcShimV200(); + } else if (hiveVersion.startsWith("2.1")) { + return new OrcShimV210(); + } else if (hiveVersion.startsWith("2.2") || + hiveVersion.startsWith("2.3") || + hiveVersion.startsWith("3.")) { + return new OrcShimV230(); + } else { + throw new UnsupportedOperationException( + "Unsupported hive version for orc shim: " + hiveVersion); + } + } +} diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShimV200.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShimV200.java new file mode 100644 index 0000000..3e59a75 --- /dev/null +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShimV200.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc.shim; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.orc.OrcSplitReader.Predicate; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; +import org.apache.orc.OrcConf; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; +import org.apache.orc.StripeInformation; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; + +import static org.apache.commons.lang3.reflect.ConstructorUtils.invokeConstructor; +import static org.apache.commons.lang3.reflect.MethodUtils.invokeExactMethod; +import static org.apache.commons.lang3.reflect.MethodUtils.invokeStaticMethod; + +/** + * Shim orc for Hive version 2.0.0 and upper versions. + */ +public class OrcShimV200 implements OrcShim { + + private static final long serialVersionUID = 1L; + + private transient Method hasNextMethod; + private transient Method nextBatchMethod; + + protected Reader createReader(Path path, Configuration conf) throws IOException { + try { + Class orcFileClass = Class.forName("org.apache.hadoop.hive.ql.io.orc.OrcFile"); + Object readerOptions = invokeStaticMethod(orcFileClass, "readerOptions", conf); + + Class readerClass = Class.forName("org.apache.hadoop.hive.ql.io.orc.ReaderImpl"); + //noinspection unchecked + return (Reader) invokeConstructor(readerClass, path, readerOptions); + } catch (ClassNotFoundException | + NoSuchMethodException | + IllegalAccessException | + InstantiationException | + InvocationTargetException e) { + throw new IOException(e); + } + } + + protected RecordReader createRecordReader(Reader reader, Reader.Options options) throws IOException { + try { + return (RecordReader) invokeExactMethod(reader, "rowsOptions", options); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { + throw new IOException(e); + } + } + + protected Reader.Options readOrcConf(Reader.Options options, Configuration conf) { + return options.useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(conf)) + .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf)); + } + + @Override + public RecordReader createRecordReader( + Configuration conf, + TypeDescription schema, + int[] selectedFields, + List<Predicate> conjunctPredicates, + org.apache.flink.core.fs.Path path, + long splitStart, + long splitLength) throws IOException { + // open ORC file and create reader + Path hPath = new Path(path.toUri()); + + Reader orcReader = createReader(hPath, conf); + + // get offset and length for the stripes that start in the split + Tuple2<Long, Long> offsetAndLength = getOffsetAndLengthForSplit( + splitStart, splitLength, orcReader.getStripes()); + + // create ORC row reader configuration + Reader.Options options = readOrcConf( + new Reader.Options().schema(schema).range(offsetAndLength.f0, offsetAndLength.f1), + conf); + + // configure filters + if (!conjunctPredicates.isEmpty()) { + SearchArgument.Builder b = SearchArgumentFactory.newBuilder(); + b = b.startAnd(); + for (Predicate predicate : conjunctPredicates) { + predicate.add(b); + } + b = b.end(); + options.searchArgument(b.build(), new String[]{}); + } + + // configure selected fields + options.include(computeProjectionMask(schema, selectedFields)); + + // create ORC row reader + RecordReader orcRowsReader = createRecordReader(orcReader, options); + + // assign ids + schema.getId(); + + return orcRowsReader; + } + + @Override + public boolean nextBatch(RecordReader reader, VectorizedRowBatch rowBatch) throws IOException { + try { + if (hasNextMethod == null) { + hasNextMethod = Class.forName("org.apache.hadoop.hive.ql.io.orc.RecordReader") + .getMethod("hasNext"); + hasNextMethod.setAccessible(true); + } + if (nextBatchMethod == null) { + nextBatchMethod = RecordReader.class.getMethod("nextBatch", VectorizedRowBatch.class); + nextBatchMethod.setAccessible(true); + } + boolean hasNext = (boolean) hasNextMethod.invoke(reader); + if (hasNext) { + nextBatchMethod.invoke(reader, rowBatch); + return true; + } else { + return false; + } + } catch (IllegalAccessException | + InvocationTargetException | + NoSuchMethodException | + ClassNotFoundException e) { + throw new IOException(e); + } + } + + @VisibleForTesting + public static Tuple2<Long, Long> getOffsetAndLengthForSplit( + long splitStart, long splitLength, List<StripeInformation> stripes) { + long splitEnd = splitStart + splitLength; + long readStart = Long.MAX_VALUE; + long readEnd = Long.MIN_VALUE; + + for (StripeInformation s : stripes) { + if (splitStart <= s.getOffset() && s.getOffset() < splitEnd) { + // stripe starts in split, so it is included + readStart = Math.min(readStart, s.getOffset()); + readEnd = Math.max(readEnd, s.getOffset() + s.getLength()); + } + } + + if (readStart < Long.MAX_VALUE) { + // at least one split is included + return Tuple2.of(readStart, readEnd - readStart); + } else { + return Tuple2.of(0L, 0L); + } + } + + /** + * Computes the ORC projection mask of the fields to include from the selected fields.rowOrcInputFormat.nextRecord(null). + * + * @return The ORC projection mask. + */ + private static boolean[] computeProjectionMask(TypeDescription schema, int[] selectedFields) { + // mask with all fields of the schema + boolean[] projectionMask = new boolean[schema.getMaximumId() + 1]; + // for each selected field + for (int inIdx : selectedFields) { + // set all nested fields of a selected field to true + TypeDescription fieldSchema = schema.getChildren().get(inIdx); + for (int i = fieldSchema.getId(); i <= fieldSchema.getMaximumId(); i++) { + projectionMask[i] = true; + } + } + return projectionMask; + } + +} diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShimV210.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShimV210.java new file mode 100644 index 0000000..90be3bb --- /dev/null +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShimV210.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc.shim; + +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.RecordReader; + +import java.io.IOException; + +/** + * Shim orc for Hive version 2.1.0 and upper versions. + */ +public class OrcShimV210 extends OrcShimV200 { + + private static final long serialVersionUID = 1L; + + @Override + public boolean nextBatch(RecordReader reader, VectorizedRowBatch rowBatch) throws IOException { + return reader.nextBatch(rowBatch); + } +} diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShimV230.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShimV230.java new file mode 100644 index 0000000..6df366e --- /dev/null +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/shim/OrcShimV230.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc.shim; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.RecordReader; + +import java.io.IOException; + +/** + * Shim orc for Hive version 2.1.0 and upper versions. + * + * <p>After hive 2.3 and later, the orc API is basically stable, so we can call it directly. + * + * <p>Since hive 2.2 not include orc classes, so we can use hive orc 2.3 to read from hive 2.2. + */ +public class OrcShimV230 extends OrcShimV210 { + + private static final long serialVersionUID = 1L; + + @Override + protected Reader createReader(Path path, + Configuration conf) throws IOException { + return OrcFile.createReader(path, OrcFile.readerOptions(conf)); + } + + @Override + protected RecordReader createRecordReader(Reader reader, Reader.Options options) throws IOException { + return reader.rows(options); + } + + @Override + protected Reader.Options readOrcConf(Reader.Options options, Configuration conf) { + return super.readOrcConf(options, conf) + .tolerateMissingSchema(OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf)); + } +} diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java index cf93206..83513ba 100644 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java +++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcColumnarRowSplitReaderTest.java @@ -402,6 +402,7 @@ public class OrcColumnarRowSplitReaderTest { Map<String, Object> partitionSpec, FileInputSplit split) throws IOException { return OrcSplitReaderUtil.genPartColumnarRowReader( + "2.3.0", new Configuration(), IntStream.range(0, fullTypes.length) .mapToObj(i -> "f" + i).toArray(String[]::new), diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java index f85d964..9481c72 100644 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java +++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java @@ -21,22 +21,22 @@ package org.apache.flink.orc; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.MapTypeInfo; import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.fs.Path; import org.apache.flink.types.Row; import org.apache.flink.util.InstantiationUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; -import org.apache.orc.Reader; +import org.apache.orc.RecordReader; import org.apache.orc.StripeInformation; -import org.apache.orc.TypeDescription; +import org.apache.orc.impl.RecordReaderImpl; +import org.apache.orc.impl.SchemaEvolution; import org.junit.After; -import org.junit.Assert; import org.junit.Test; import java.io.FileNotFoundException; @@ -47,8 +47,9 @@ import java.sql.Timestamp; import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.stream.IntStream; +import static org.apache.commons.lang3.reflect.FieldUtils.readDeclaredField; +import static org.apache.flink.orc.shim.OrcShimV200.getOffsetAndLengthForSplit; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -176,7 +177,7 @@ public class OrcRowInputFormatTest { } @Test - public void testProjectionMaskNested() throws IOException{ + public void testProjectionMaskNested() throws Exception { rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration()); @@ -196,11 +197,16 @@ public class OrcRowInputFormatTest { true, true, true, true, true, // nested field 9 is in false, false, false, false, // nested field 10 is out true, true, true, true, true}; // nested field 11 is in - assertArrayEquals(expected, spy.getReader().getOptions().getInclude()); + assertArrayEquals(expected, getInclude(spy.getReader().getRecordReader())); + } + + private static boolean[] getInclude(RecordReader reader) throws IllegalAccessException { + SchemaEvolution evolution = (SchemaEvolution) readDeclaredField(reader, "evolution", true); + return evolution.getReaderIncluded(); } @Test - public void testSplitStripesGivenSplits() throws IOException { + public void testSplitStripesGivenSplits() throws Exception { rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration()); @@ -210,37 +216,34 @@ public class OrcRowInputFormatTest { spy.openInputFormat(); spy.open(splits[0]); - assertOptions(spy.getReader(), 3L, 137005L); + assertOffsetAndLen(spy.getReader(), 3L, 137005L); spy.open(splits[1]); - assertOptions(spy.getReader(), 137008L, 136182L); + assertOffsetAndLen(spy.getReader(), 137008L, 136182L); spy.open(splits[2]); - assertOptions(spy.getReader(), 273190L, 123633L); + assertOffsetAndLen(spy.getReader(), 273190L, 123633L); } - private static void assertOptions(OrcSplitReader reader, long offset, long length) { - Reader.Options options = reader.getOptions(); - Assert.assertEquals(offset, options.getOffset()); - Assert.assertEquals(length, options.getLength()); + @SuppressWarnings("unchecked") + private static List<StripeInformation> getStripes(RecordReader reader) throws IllegalAccessException { + return (List<StripeInformation>) readDeclaredField(reader, "stripes", true); } - private static OrcRowSplitReader newReaderWithStripes( - List<StripeInformation> stripes, FileInputSplit split) throws IOException { - TypeDescription td = TypeDescription.fromString(TEST_SCHEMA_FLAT); - return new OrcRowSplitReader( - new Configuration(), - td, - IntStream.range(0, td.getChildren().size()).toArray(), - new ArrayList<>(), - 1024, - split.getPath(), - split.getStart(), - split.getLength()) { - - @Override - List<StripeInformation> getStripes(Reader orcReader) { - return stripes; + private static void assertOffsetAndLen( + OrcSplitReader reader, long offset, long length) throws IllegalAccessException { + List<StripeInformation> stripes = getStripes(reader.getRecordReader()); + long min = Long.MAX_VALUE; + long max = Long.MIN_VALUE; + for (StripeInformation stripe : stripes) { + if (stripe.getOffset() < min) { + min = stripe.getOffset(); } - }; + if (stripe.getOffset() + stripe.getLength() > max) { + max = stripe.getOffset() + stripe.getLength(); + } + } + + assertEquals(offset, min); + assertEquals(length, max - min); } @Test @@ -269,21 +272,17 @@ public class OrcRowInputFormatTest { stripes.add(stripe5); // split ranging 2 stripes - OrcRowSplitReader reader = newReaderWithStripes( - stripes, new FileInputSplit(0, new Path(getPath(TEST_FILE_FLAT)), 0, 150, new String[]{})); - assertOptions(reader, 10L, 190L); + assertEquals(new Tuple2<>(10L, 190L), getOffsetAndLengthForSplit(0, 150, stripes)); + // split ranging 0 stripes - reader = newReaderWithStripes( - stripes, new FileInputSplit(1, new Path(getPath(TEST_FILE_FLAT)), 150, 10, new String[]{})); - assertOptions(reader, 0L, 0L); + assertEquals(new Tuple2<>(0L, 0L), getOffsetAndLengthForSplit(150, 10, stripes)); + // split ranging 1 stripe - reader = newReaderWithStripes( - stripes, new FileInputSplit(2, new Path(getPath(TEST_FILE_FLAT)), 160, 41, new String[]{})); - assertOptions(reader, 200L, 100L); + assertEquals(new Tuple2<>(200L, 100L), getOffsetAndLengthForSplit(160, 41, stripes)); + // split ranging 2 stripe - reader = newReaderWithStripes( - stripes, new FileInputSplit(3, new Path(getPath(TEST_FILE_FLAT)), 201, 299, new String[]{})); - assertOptions(reader, 300L, 200L); + assertEquals(new Tuple2<>(300L, 200L), getOffsetAndLengthForSplit(201, 299, stripes)); + } @Test @@ -421,7 +420,7 @@ public class OrcRowInputFormatTest { spy.open(splits[0]); // verify predicate configuration - SearchArgument sarg = spy.getReader().getOptions().getSearchArgument(); + SearchArgument sarg = getSearchArgument(spy.getReader().getRecordReader()); assertNotNull(sarg); assertEquals("(and leaf-0 leaf-1 leaf-2 leaf-3 leaf-4 leaf-5 leaf-6 leaf-7 leaf-8)", sarg.getExpression().toString()); assertEquals(9, sarg.getLeaves().size()); @@ -437,6 +436,12 @@ public class OrcRowInputFormatTest { assertEquals("(EQUALS string1 hello)", leaves.get(8).toString()); } + private static SearchArgument getSearchArgument(RecordReader reader) throws IllegalAccessException { + RecordReaderImpl.SargApplier applier = + (RecordReaderImpl.SargApplier) readDeclaredField(reader, "sargApp", true); + return (SearchArgument) readDeclaredField(applier, "sarg", true); + } + @Test public void testTimePredicates() throws Exception { rowOrcInputFormat = @@ -461,7 +466,7 @@ public class OrcRowInputFormatTest { spy.open(splits[0]); // verify predicate configuration - SearchArgument sarg = spy.getReader().getOptions().getSearchArgument(); + SearchArgument sarg = getSearchArgument(spy.getReader().getRecordReader()); assertNotNull(sarg); assertEquals("(or leaf-0 leaf-1)", sarg.getExpression().toString()); assertEquals(2, sarg.getLeaves().size()); @@ -490,7 +495,7 @@ public class OrcRowInputFormatTest { spy.open(splits[0]); // verify predicate configuration - SearchArgument sarg = spy.getReader().getOptions().getSearchArgument(); + SearchArgument sarg = getSearchArgument(spy.getReader().getRecordReader()); assertNotNull(sarg); assertEquals("(not leaf-0)", sarg.getExpression().toString()); assertEquals(1, sarg.getLeaves().size()); diff --git a/pom.xml b/pom.xml index 20e48b2..c4343e8 100644 --- a/pom.xml +++ b/pom.xml @@ -146,6 +146,7 @@ under the License. <minikdc.version>3.2.0</minikdc.version> <generated.docs.dir>./docs/_includes/generated</generated.docs.dir> <hive.version>2.3.4</hive.version> + <orc.version>1.4.3</orc.version> <!-- Hive 2.3.4 relies on Hadoop 2.7.2 and later versions. For Hadoop 2.7, the minor Hadoop version supported for flink-shaded-hadoop-2-uber is 2.7.5