shardulm94 commented on a change in pull request #933: URL: https://github.com/apache/iceberg/pull/933#discussion_r439086089
########## File path: mr/src/main/java/org/apache/iceberg/mr/IcebergRecordReader.java ########## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr; + +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.orc.GenericOrcReader; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.expressions.Evaluator; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; + +public class IcebergRecordReader<T> { Review comment: Is it possible to reuse the IcebergRecordReader already implemented in https://github.com/apache/iceberg/blob/cad124967ceb740178a3f65b75f19017494b430d/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L280. Seems much more extensive to me, e.g. handling identity partitioned data and multiple data models e.g. Pig. ########## File path: mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergInputFormat.java ########## @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.net.URI; +import java.util.Iterator; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; +import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.SnapshotsTable; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.mr.SerializationUtil; +import org.apache.iceberg.mr.mapred.iterables.SnapshotIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * CombineHiveInputFormat.AvoidSplitCombination is implemented to correctly delegate InputSplit + * creation to this class. See: https://stackoverflow.com/questions/29133275/ + * custom-inputformat-getsplits-never-called-in-hive + */ +public class IcebergInputFormat<T> implements InputFormat<Void, T>, CombineHiveInputFormat.AvoidSplitCombination { + private static final Logger LOG = LoggerFactory.getLogger(IcebergInputFormat.class); + + private Table table; + private long currentSnapshotId; + private String virtualSnapshotIdColumnName; + + @Override + public InputSplit[] getSplits(JobConf conf, int numSplits) throws IOException { + table = TableResolver.resolveTableFromJob(conf); + URI location = TableResolver.pathAsURI(conf.get(InputFormatConfig.TABLE_LOCATION)); + List<CombinedScanTask> tasks = planTasks(conf); + return createSplits(tasks, location.toString()); + } + + private List<CombinedScanTask> planTasks(JobConf conf) { + // Set defaults for virtual column + Snapshot currentSnapshot = table.currentSnapshot(); + if (currentSnapshot != null) { + currentSnapshotId = currentSnapshot.snapshotId(); + } + virtualSnapshotIdColumnName = SystemTableUtil.getVirtualColumnName(conf); + + String[] readColumns = ColumnProjectionUtils.getReadColumnNames(conf); + List<CombinedScanTask> tasks; + if (conf.get(TableScanDesc.FILTER_EXPR_CONF_STR) == null) { + tasks = Lists.newArrayList(table + .newScan() + .select(readColumns) + .planTasks()); + } else { + ExprNodeGenericFuncDesc exprNodeDesc = SerializationUtilities + .deserializeObject(conf.get(TableScanDesc.FILTER_EXPR_CONF_STR), ExprNodeGenericFuncDesc.class); + SearchArgument sarg = ConvertAstToSearchArg.create(conf, exprNodeDesc); + Expression filter = IcebergFilterFactory.generateFilterExpression(sarg); + + long snapshotIdToScan = extractSnapshotID(conf, exprNodeDesc); Review comment: I guess this may cause issues if the table itself contains a column called `snapshot__id`. @rdblue Do you think we should reserve a few column names (or a prefix) in the spec for these virtual columns? I guess such virtual columns are generally useful for supporting time travel/versioning/incremental scans in purely SQL engines. ########## File path: mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergFilterFactory.java ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred; + +import java.util.List; +import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; + +import static org.apache.iceberg.expressions.Expressions.and; +import static org.apache.iceberg.expressions.Expressions.equal; +import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual; +import static org.apache.iceberg.expressions.Expressions.in; +import static org.apache.iceberg.expressions.Expressions.isNull; +import static org.apache.iceberg.expressions.Expressions.lessThan; +import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual; +import static org.apache.iceberg.expressions.Expressions.not; +import static org.apache.iceberg.expressions.Expressions.notNull; +import static org.apache.iceberg.expressions.Expressions.or; + +public class IcebergFilterFactory { + + private IcebergFilterFactory() { + } + + public static Expression generateFilterExpression(SearchArgument sarg) { + List<PredicateLeaf> leaves = sarg.getLeaves(); + List<ExpressionTree> childNodes = sarg.getExpression().getChildren(); + + switch (sarg.getExpression().getOperator()) { + case OR: + ExpressionTree orLeft = childNodes.get(0); + ExpressionTree orRight = childNodes.get(1); + return or(translate(orLeft, leaves), translate(orRight, leaves)); + case AND: + ExpressionTree andLeft = childNodes.get(0); + ExpressionTree andRight = childNodes.get(1); + if (childNodes.size() > 2) { Review comment: Can also be true for `OR`? Also, maybe simplify this to ```java Expression result = Expression.alwaysTrue(); for (ExpressionTree child: childNodes) { result = and(result, translate(child, leaves)) } ``` ########## File path: mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergFilterFactory.java ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred; + +import java.util.List; +import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; + +import static org.apache.iceberg.expressions.Expressions.and; +import static org.apache.iceberg.expressions.Expressions.equal; +import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual; +import static org.apache.iceberg.expressions.Expressions.in; +import static org.apache.iceberg.expressions.Expressions.isNull; +import static org.apache.iceberg.expressions.Expressions.lessThan; +import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual; +import static org.apache.iceberg.expressions.Expressions.not; +import static org.apache.iceberg.expressions.Expressions.notNull; +import static org.apache.iceberg.expressions.Expressions.or; + +public class IcebergFilterFactory { + + private IcebergFilterFactory() { + } + + public static Expression generateFilterExpression(SearchArgument sarg) { + List<PredicateLeaf> leaves = sarg.getLeaves(); + List<ExpressionTree> childNodes = sarg.getExpression().getChildren(); + + switch (sarg.getExpression().getOperator()) { + case OR: + ExpressionTree orLeft = childNodes.get(0); + ExpressionTree orRight = childNodes.get(1); + return or(translate(orLeft, leaves), translate(orRight, leaves)); + case AND: + ExpressionTree andLeft = childNodes.get(0); + ExpressionTree andRight = childNodes.get(1); + if (childNodes.size() > 2) { + Expression[] evaluatedChildren = getLeftoverLeaves(childNodes, leaves); + return and( + translate(andLeft, leaves), translate(andRight, leaves), evaluatedChildren); + } else { + return and(translate(andLeft, leaves), translate(andRight, leaves)); + } + case NOT: + return not(translateLeaf(sarg.getLeaves().get(0))); + case LEAF: + return translateLeaf(sarg.getLeaves().get(0)); + case CONSTANT: + return null; + default: + throw new IllegalStateException("Unknown operator: " + sarg.getExpression().getOperator()); + } + } + + /** + * Remove first 2 nodes already evaluated and return an array of the evaluated leftover nodes. + * @param allChildNodes All child nodes to be evaluated for the AND expression. + * @param leaves All instances of the leaf nodes. + * @return Array of leftover evaluated nodes. + */ + private static Expression[] getLeftoverLeaves(List<ExpressionTree> allChildNodes, List<PredicateLeaf> leaves) { + allChildNodes.remove(0); + allChildNodes.remove(0); + + Expression[] evaluatedLeaves = new Expression[allChildNodes.size()]; + for (int i = 0; i < allChildNodes.size(); i++) { + Expression filter = translate(allChildNodes.get(i), leaves); + evaluatedLeaves[i] = filter; + } + return evaluatedLeaves; + } + + /** + * Recursive method to traverse down the ExpressionTree to evaluate each expression and its leaf nodes. + * @param tree Current ExpressionTree where the 'top' node is being evaluated. + * @param leaves List of all leaf nodes within the tree. + * @return Expression that is translated from the Hive SearchArgument. + */ + private static Expression translate(ExpressionTree tree, List<PredicateLeaf> leaves) { + switch (tree.getOperator()) { + case OR: + return or(translate(tree.getChildren().get(0), leaves), + translate(tree.getChildren().get(1), leaves)); + case AND: + if (tree.getChildren().size() > 2) { + Expression[] evaluatedChildren = getLeftoverLeaves(tree.getChildren(), leaves); + return and(translate(tree.getChildren().get(0), leaves), + translate(tree.getChildren().get(1), leaves), evaluatedChildren); + } else { + return and(translate(tree.getChildren().get(0), leaves), + translate(tree.getChildren().get(1), leaves)); + } + case NOT: + return not(translate(tree.getChildren().get(0), leaves)); + case LEAF: + return translateLeaf(leaves.get(tree.getLeaf())); + case CONSTANT: + //We are unsure of how the CONSTANT case works, so using the approach of: + //https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ + // ParquetFilterPredicateConverter.java#L116 + return null; + default: + throw new IllegalStateException("Unknown operator: " + tree.getOperator()); + } + } + + /** + * Translate leaf nodes from Hive operator to Iceberg operator. + * @param leaf Leaf node + * @return Expression fully translated from Hive PredicateLeaf + */ + private static Expression translateLeaf(PredicateLeaf leaf) { + String column = leaf.getColumnName(); + if (column.equals("snapshot__id")) { + return Expressions.alwaysTrue(); + } + switch (leaf.getOperator()) { + case EQUALS: + return equal(column, leaf.getLiteral()); Review comment: We will need to convert literal values from the Hive data types to Iceberg data types. ########## File path: mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergFilterFactory.java ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred; + +import java.util.List; +import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; + +import static org.apache.iceberg.expressions.Expressions.and; +import static org.apache.iceberg.expressions.Expressions.equal; +import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual; +import static org.apache.iceberg.expressions.Expressions.in; +import static org.apache.iceberg.expressions.Expressions.isNull; +import static org.apache.iceberg.expressions.Expressions.lessThan; +import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual; +import static org.apache.iceberg.expressions.Expressions.not; +import static org.apache.iceberg.expressions.Expressions.notNull; +import static org.apache.iceberg.expressions.Expressions.or; + +public class IcebergFilterFactory { + + private IcebergFilterFactory() { + } + + public static Expression generateFilterExpression(SearchArgument sarg) { + List<PredicateLeaf> leaves = sarg.getLeaves(); + List<ExpressionTree> childNodes = sarg.getExpression().getChildren(); + + switch (sarg.getExpression().getOperator()) { + case OR: + ExpressionTree orLeft = childNodes.get(0); + ExpressionTree orRight = childNodes.get(1); + return or(translate(orLeft, leaves), translate(orRight, leaves)); + case AND: + ExpressionTree andLeft = childNodes.get(0); + ExpressionTree andRight = childNodes.get(1); + if (childNodes.size() > 2) { + Expression[] evaluatedChildren = getLeftoverLeaves(childNodes, leaves); + return and( + translate(andLeft, leaves), translate(andRight, leaves), evaluatedChildren); + } else { + return and(translate(andLeft, leaves), translate(andRight, leaves)); + } + case NOT: + return not(translateLeaf(sarg.getLeaves().get(0))); + case LEAF: + return translateLeaf(sarg.getLeaves().get(0)); + case CONSTANT: + return null; + default: + throw new IllegalStateException("Unknown operator: " + sarg.getExpression().getOperator()); + } + } + + /** + * Remove first 2 nodes already evaluated and return an array of the evaluated leftover nodes. + * @param allChildNodes All child nodes to be evaluated for the AND expression. + * @param leaves All instances of the leaf nodes. + * @return Array of leftover evaluated nodes. + */ + private static Expression[] getLeftoverLeaves(List<ExpressionTree> allChildNodes, List<PredicateLeaf> leaves) { Review comment: I guess this should be unnecessary if the logic for OR/AND operators is simplified to use a for loop as in my suggestion above. ########## File path: mr/src/main/java/org/apache/iceberg/mr/mapred/iterables/SnapshotIterable.java ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred.iterables; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; + +/** + * Creates an Iterable of Records with all snapshot metadata that can be used with the RecordReader. + */ +public class SnapshotIterable implements CloseableIterable { Review comment: Is this really needed? Can we reuse https://github.com/apache/iceberg/blob/cad124967ceb740178a3f65b75f19017494b430d/core/src/main/java/org/apache/iceberg/SnapshotsTable.java#L60 to get the iterable instead? ########## File path: mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergFilterFactory.java ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred; + +import java.util.List; +import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; + +import static org.apache.iceberg.expressions.Expressions.and; +import static org.apache.iceberg.expressions.Expressions.equal; +import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual; +import static org.apache.iceberg.expressions.Expressions.in; +import static org.apache.iceberg.expressions.Expressions.isNull; +import static org.apache.iceberg.expressions.Expressions.lessThan; +import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual; +import static org.apache.iceberg.expressions.Expressions.not; +import static org.apache.iceberg.expressions.Expressions.notNull; +import static org.apache.iceberg.expressions.Expressions.or; + +public class IcebergFilterFactory { + + private IcebergFilterFactory() { + } + + public static Expression generateFilterExpression(SearchArgument sarg) { + List<PredicateLeaf> leaves = sarg.getLeaves(); + List<ExpressionTree> childNodes = sarg.getExpression().getChildren(); + + switch (sarg.getExpression().getOperator()) { + case OR: + ExpressionTree orLeft = childNodes.get(0); + ExpressionTree orRight = childNodes.get(1); + return or(translate(orLeft, leaves), translate(orRight, leaves)); + case AND: + ExpressionTree andLeft = childNodes.get(0); + ExpressionTree andRight = childNodes.get(1); + if (childNodes.size() > 2) { + Expression[] evaluatedChildren = getLeftoverLeaves(childNodes, leaves); + return and( + translate(andLeft, leaves), translate(andRight, leaves), evaluatedChildren); + } else { + return and(translate(andLeft, leaves), translate(andRight, leaves)); + } + case NOT: + return not(translateLeaf(sarg.getLeaves().get(0))); + case LEAF: + return translateLeaf(sarg.getLeaves().get(0)); + case CONSTANT: + return null; + default: + throw new IllegalStateException("Unknown operator: " + sarg.getExpression().getOperator()); + } + } + + /** + * Remove first 2 nodes already evaluated and return an array of the evaluated leftover nodes. + * @param allChildNodes All child nodes to be evaluated for the AND expression. + * @param leaves All instances of the leaf nodes. + * @return Array of leftover evaluated nodes. + */ + private static Expression[] getLeftoverLeaves(List<ExpressionTree> allChildNodes, List<PredicateLeaf> leaves) { + allChildNodes.remove(0); + allChildNodes.remove(0); + + Expression[] evaluatedLeaves = new Expression[allChildNodes.size()]; + for (int i = 0; i < allChildNodes.size(); i++) { + Expression filter = translate(allChildNodes.get(i), leaves); + evaluatedLeaves[i] = filter; + } + return evaluatedLeaves; + } + + /** + * Recursive method to traverse down the ExpressionTree to evaluate each expression and its leaf nodes. + * @param tree Current ExpressionTree where the 'top' node is being evaluated. + * @param leaves List of all leaf nodes within the tree. + * @return Expression that is translated from the Hive SearchArgument. + */ + private static Expression translate(ExpressionTree tree, List<PredicateLeaf> leaves) { + switch (tree.getOperator()) { + case OR: + return or(translate(tree.getChildren().get(0), leaves), + translate(tree.getChildren().get(1), leaves)); + case AND: + if (tree.getChildren().size() > 2) { + Expression[] evaluatedChildren = getLeftoverLeaves(tree.getChildren(), leaves); + return and(translate(tree.getChildren().get(0), leaves), + translate(tree.getChildren().get(1), leaves), evaluatedChildren); + } else { + return and(translate(tree.getChildren().get(0), leaves), + translate(tree.getChildren().get(1), leaves)); + } + case NOT: + return not(translate(tree.getChildren().get(0), leaves)); + case LEAF: + return translateLeaf(leaves.get(tree.getLeaf())); + case CONSTANT: + //We are unsure of how the CONSTANT case works, so using the approach of: + //https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ + // ParquetFilterPredicateConverter.java#L116 + return null; Review comment: We return `null` here but `Expression.and()`, `Expressions.not()` and Expressions.or()` have null checks in them, so seems like this will fail regardless? Is it better to throw a more readable exception here instead. ########## File path: mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergFilterFactory.java ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred; + +import java.util.List; +import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; + +import static org.apache.iceberg.expressions.Expressions.and; +import static org.apache.iceberg.expressions.Expressions.equal; +import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual; +import static org.apache.iceberg.expressions.Expressions.in; +import static org.apache.iceberg.expressions.Expressions.isNull; +import static org.apache.iceberg.expressions.Expressions.lessThan; +import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual; +import static org.apache.iceberg.expressions.Expressions.not; +import static org.apache.iceberg.expressions.Expressions.notNull; +import static org.apache.iceberg.expressions.Expressions.or; + +public class IcebergFilterFactory { + + private IcebergFilterFactory() { + } + + public static Expression generateFilterExpression(SearchArgument sarg) { Review comment: A lot of the code is duplicated between this method and `translate()`. Should we just call `translate(sarg.getExpression(), sarg.getLeaves())` here? ########## File path: mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergFilterFactory.java ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred; + +import java.util.List; +import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; + +import static org.apache.iceberg.expressions.Expressions.and; +import static org.apache.iceberg.expressions.Expressions.equal; +import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual; +import static org.apache.iceberg.expressions.Expressions.in; +import static org.apache.iceberg.expressions.Expressions.isNull; +import static org.apache.iceberg.expressions.Expressions.lessThan; +import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual; +import static org.apache.iceberg.expressions.Expressions.not; +import static org.apache.iceberg.expressions.Expressions.notNull; +import static org.apache.iceberg.expressions.Expressions.or; + +public class IcebergFilterFactory { + + private IcebergFilterFactory() { + } + + public static Expression generateFilterExpression(SearchArgument sarg) { + List<PredicateLeaf> leaves = sarg.getLeaves(); + List<ExpressionTree> childNodes = sarg.getExpression().getChildren(); + + switch (sarg.getExpression().getOperator()) { + case OR: + ExpressionTree orLeft = childNodes.get(0); + ExpressionTree orRight = childNodes.get(1); + return or(translate(orLeft, leaves), translate(orRight, leaves)); + case AND: + ExpressionTree andLeft = childNodes.get(0); + ExpressionTree andRight = childNodes.get(1); + if (childNodes.size() > 2) { + Expression[] evaluatedChildren = getLeftoverLeaves(childNodes, leaves); + return and( + translate(andLeft, leaves), translate(andRight, leaves), evaluatedChildren); + } else { + return and(translate(andLeft, leaves), translate(andRight, leaves)); + } + case NOT: + return not(translateLeaf(sarg.getLeaves().get(0))); + case LEAF: + return translateLeaf(sarg.getLeaves().get(0)); + case CONSTANT: + return null; + default: + throw new IllegalStateException("Unknown operator: " + sarg.getExpression().getOperator()); + } + } + + /** + * Remove first 2 nodes already evaluated and return an array of the evaluated leftover nodes. + * @param allChildNodes All child nodes to be evaluated for the AND expression. + * @param leaves All instances of the leaf nodes. + * @return Array of leftover evaluated nodes. + */ + private static Expression[] getLeftoverLeaves(List<ExpressionTree> allChildNodes, List<PredicateLeaf> leaves) { + allChildNodes.remove(0); + allChildNodes.remove(0); + + Expression[] evaluatedLeaves = new Expression[allChildNodes.size()]; + for (int i = 0; i < allChildNodes.size(); i++) { + Expression filter = translate(allChildNodes.get(i), leaves); + evaluatedLeaves[i] = filter; + } + return evaluatedLeaves; + } + + /** + * Recursive method to traverse down the ExpressionTree to evaluate each expression and its leaf nodes. + * @param tree Current ExpressionTree where the 'top' node is being evaluated. + * @param leaves List of all leaf nodes within the tree. + * @return Expression that is translated from the Hive SearchArgument. + */ + private static Expression translate(ExpressionTree tree, List<PredicateLeaf> leaves) { + switch (tree.getOperator()) { + case OR: + return or(translate(tree.getChildren().get(0), leaves), + translate(tree.getChildren().get(1), leaves)); + case AND: + if (tree.getChildren().size() > 2) { + Expression[] evaluatedChildren = getLeftoverLeaves(tree.getChildren(), leaves); + return and(translate(tree.getChildren().get(0), leaves), + translate(tree.getChildren().get(1), leaves), evaluatedChildren); + } else { + return and(translate(tree.getChildren().get(0), leaves), + translate(tree.getChildren().get(1), leaves)); + } + case NOT: + return not(translate(tree.getChildren().get(0), leaves)); + case LEAF: + return translateLeaf(leaves.get(tree.getLeaf())); + case CONSTANT: + //We are unsure of how the CONSTANT case works, so using the approach of: + //https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ + // ParquetFilterPredicateConverter.java#L116 + return null; + default: + throw new IllegalStateException("Unknown operator: " + tree.getOperator()); + } + } + + /** + * Translate leaf nodes from Hive operator to Iceberg operator. + * @param leaf Leaf node + * @return Expression fully translated from Hive PredicateLeaf + */ + private static Expression translateLeaf(PredicateLeaf leaf) { + String column = leaf.getColumnName(); + if (column.equals("snapshot__id")) { Review comment: Reference the `snapshot__id` constant defined in SystemTableUtil ########## File path: mr/src/main/java/org/apache/iceberg/mr/mapred/TableResolver.java ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.mapred; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.mr.InputFormatConfig; + +final class TableResolver { Review comment: I don't see the need to for having two table resolver classes. Can we combine them? The only difference I see is that this class uses `org.apache.hadoop.mapred.JobConf` instead of `org.apache.hadoop.mapreduce.JobConf`. It seems like we don't use any `JobConf` methods, and if we have to, we can create 2 methods instead of 2 classes. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org