shardulm94 commented on a change in pull request #933:
URL: https://github.com/apache/iceberg/pull/933#discussion_r439086089



##########
File path: mr/src/main/java/org/apache/iceberg/mr/IcebergRecordReader.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+
+public class IcebergRecordReader<T> {

Review comment:
       Is it possible to reuse the IcebergRecordReader already implemented in 
https://github.com/apache/iceberg/blob/cad124967ceb740178a3f65b75f19017494b430d/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L280.
 Seems much more extensive to me, e.g. handling identity partitioned data and 
multiple data models e.g. Pig.

##########
File path: mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergInputFormat.java
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotsTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.SerializationUtil;
+import org.apache.iceberg.mr.mapred.iterables.SnapshotIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * CombineHiveInputFormat.AvoidSplitCombination is implemented to correctly 
delegate InputSplit
+ * creation to this class. See: https://stackoverflow.com/questions/29133275/
+ * custom-inputformat-getsplits-never-called-in-hive
+ */
+public class IcebergInputFormat<T> implements InputFormat<Void, T>, 
CombineHiveInputFormat.AvoidSplitCombination {
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergInputFormat.class);
+
+  private Table table;
+  private long currentSnapshotId;
+  private String virtualSnapshotIdColumnName;
+
+  @Override
+  public InputSplit[] getSplits(JobConf conf, int numSplits) throws 
IOException {
+    table = TableResolver.resolveTableFromJob(conf);
+    URI location = 
TableResolver.pathAsURI(conf.get(InputFormatConfig.TABLE_LOCATION));
+    List<CombinedScanTask> tasks = planTasks(conf);
+    return createSplits(tasks, location.toString());
+  }
+
+  private List<CombinedScanTask> planTasks(JobConf conf) {
+    // Set defaults for virtual column
+    Snapshot currentSnapshot = table.currentSnapshot();
+    if (currentSnapshot != null) {
+      currentSnapshotId = currentSnapshot.snapshotId();
+    }
+    virtualSnapshotIdColumnName = SystemTableUtil.getVirtualColumnName(conf);
+
+    String[] readColumns = ColumnProjectionUtils.getReadColumnNames(conf);
+    List<CombinedScanTask> tasks;
+    if (conf.get(TableScanDesc.FILTER_EXPR_CONF_STR) == null) {
+      tasks = Lists.newArrayList(table
+              .newScan()
+              .select(readColumns)
+              .planTasks());
+    } else {
+      ExprNodeGenericFuncDesc exprNodeDesc = SerializationUtilities
+              .deserializeObject(conf.get(TableScanDesc.FILTER_EXPR_CONF_STR), 
ExprNodeGenericFuncDesc.class);
+      SearchArgument sarg = ConvertAstToSearchArg.create(conf, exprNodeDesc);
+      Expression filter = IcebergFilterFactory.generateFilterExpression(sarg);
+
+      long snapshotIdToScan = extractSnapshotID(conf, exprNodeDesc);

Review comment:
       I guess this may cause issues if the table itself contains a column 
called `snapshot__id`. @rdblue Do you think we should reserve a few column 
names (or a prefix) in the spec for these virtual columns? I guess such virtual 
columns are generally useful for supporting time travel/versioning/incremental 
scans in purely SQL engines.

##########
File path: 
mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergFilterFactory.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.util.List;
+import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+
+import static org.apache.iceberg.expressions.Expressions.and;
+import static org.apache.iceberg.expressions.Expressions.equal;
+import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.in;
+import static org.apache.iceberg.expressions.Expressions.isNull;
+import static org.apache.iceberg.expressions.Expressions.lessThan;
+import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.not;
+import static org.apache.iceberg.expressions.Expressions.notNull;
+import static org.apache.iceberg.expressions.Expressions.or;
+
+public class IcebergFilterFactory {
+
+  private IcebergFilterFactory() {
+  }
+
+  public static Expression generateFilterExpression(SearchArgument sarg) {
+    List<PredicateLeaf> leaves = sarg.getLeaves();
+    List<ExpressionTree> childNodes = sarg.getExpression().getChildren();
+
+    switch (sarg.getExpression().getOperator()) {
+      case OR:
+        ExpressionTree orLeft = childNodes.get(0);
+        ExpressionTree orRight = childNodes.get(1);
+        return or(translate(orLeft, leaves), translate(orRight, leaves));
+      case AND:
+        ExpressionTree andLeft = childNodes.get(0);
+        ExpressionTree andRight = childNodes.get(1);
+        if (childNodes.size() > 2) {

Review comment:
       Can also be true for `OR`?
   
   Also, maybe simplify this to
   ```java
   Expression result = Expression.alwaysTrue();
   for (ExpressionTree child: childNodes) {
      result = and(result, translate(child, leaves))
   }
   ```

##########
File path: 
mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergFilterFactory.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.util.List;
+import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+
+import static org.apache.iceberg.expressions.Expressions.and;
+import static org.apache.iceberg.expressions.Expressions.equal;
+import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.in;
+import static org.apache.iceberg.expressions.Expressions.isNull;
+import static org.apache.iceberg.expressions.Expressions.lessThan;
+import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.not;
+import static org.apache.iceberg.expressions.Expressions.notNull;
+import static org.apache.iceberg.expressions.Expressions.or;
+
+public class IcebergFilterFactory {
+
+  private IcebergFilterFactory() {
+  }
+
+  public static Expression generateFilterExpression(SearchArgument sarg) {
+    List<PredicateLeaf> leaves = sarg.getLeaves();
+    List<ExpressionTree> childNodes = sarg.getExpression().getChildren();
+
+    switch (sarg.getExpression().getOperator()) {
+      case OR:
+        ExpressionTree orLeft = childNodes.get(0);
+        ExpressionTree orRight = childNodes.get(1);
+        return or(translate(orLeft, leaves), translate(orRight, leaves));
+      case AND:
+        ExpressionTree andLeft = childNodes.get(0);
+        ExpressionTree andRight = childNodes.get(1);
+        if (childNodes.size() > 2) {
+          Expression[] evaluatedChildren = getLeftoverLeaves(childNodes, 
leaves);
+          return and(
+                  translate(andLeft, leaves), translate(andRight, leaves), 
evaluatedChildren);
+        } else {
+          return and(translate(andLeft, leaves), translate(andRight, leaves));
+        }
+      case NOT:
+        return not(translateLeaf(sarg.getLeaves().get(0)));
+      case LEAF:
+        return translateLeaf(sarg.getLeaves().get(0));
+      case CONSTANT:
+        return null;
+      default:
+        throw new IllegalStateException("Unknown operator: " + 
sarg.getExpression().getOperator());
+    }
+  }
+
+  /**
+   * Remove first 2 nodes already evaluated and return an array of the 
evaluated leftover nodes.
+   * @param allChildNodes All child nodes to be evaluated for the AND 
expression.
+   * @param leaves All instances of the leaf nodes.
+   * @return Array of leftover evaluated nodes.
+   */
+  private static Expression[] getLeftoverLeaves(List<ExpressionTree> 
allChildNodes, List<PredicateLeaf> leaves) {
+    allChildNodes.remove(0);
+    allChildNodes.remove(0);
+
+    Expression[] evaluatedLeaves = new Expression[allChildNodes.size()];
+    for (int i = 0; i < allChildNodes.size(); i++) {
+      Expression filter = translate(allChildNodes.get(i), leaves);
+      evaluatedLeaves[i] = filter;
+    }
+    return evaluatedLeaves;
+  }
+
+  /**
+   * Recursive method to traverse down the ExpressionTree to evaluate each 
expression and its leaf nodes.
+   * @param tree Current ExpressionTree where the 'top' node is being 
evaluated.
+   * @param leaves List of all leaf nodes within the tree.
+   * @return Expression that is translated from the Hive SearchArgument.
+   */
+  private static Expression translate(ExpressionTree tree, List<PredicateLeaf> 
leaves) {
+    switch (tree.getOperator()) {
+      case OR:
+        return or(translate(tree.getChildren().get(0), leaves),
+                translate(tree.getChildren().get(1), leaves));
+      case AND:
+        if (tree.getChildren().size() > 2) {
+          Expression[] evaluatedChildren = 
getLeftoverLeaves(tree.getChildren(), leaves);
+          return and(translate(tree.getChildren().get(0), leaves),
+                  translate(tree.getChildren().get(1), leaves), 
evaluatedChildren);
+        } else {
+          return and(translate(tree.getChildren().get(0), leaves),
+                  translate(tree.getChildren().get(1), leaves));
+        }
+      case NOT:
+        return not(translate(tree.getChildren().get(0), leaves));
+      case LEAF:
+        return translateLeaf(leaves.get(tree.getLeaf()));
+      case CONSTANT:
+        //We are unsure of how the CONSTANT case works, so using the approach 
of:
+        
//https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/
+        // ParquetFilterPredicateConverter.java#L116
+        return null;
+      default:
+        throw new IllegalStateException("Unknown operator: " + 
tree.getOperator());
+    }
+  }
+
+  /**
+   * Translate leaf nodes from Hive operator to Iceberg operator.
+   * @param leaf Leaf node
+   * @return Expression fully translated from Hive PredicateLeaf
+   */
+  private static Expression translateLeaf(PredicateLeaf leaf) {
+    String column = leaf.getColumnName();
+    if (column.equals("snapshot__id")) {
+      return Expressions.alwaysTrue();
+    }
+    switch (leaf.getOperator()) {
+      case EQUALS:
+        return equal(column, leaf.getLiteral());

Review comment:
       We will need to convert literal values from the Hive data types to 
Iceberg data types.

##########
File path: 
mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergFilterFactory.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.util.List;
+import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+
+import static org.apache.iceberg.expressions.Expressions.and;
+import static org.apache.iceberg.expressions.Expressions.equal;
+import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.in;
+import static org.apache.iceberg.expressions.Expressions.isNull;
+import static org.apache.iceberg.expressions.Expressions.lessThan;
+import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.not;
+import static org.apache.iceberg.expressions.Expressions.notNull;
+import static org.apache.iceberg.expressions.Expressions.or;
+
+public class IcebergFilterFactory {
+
+  private IcebergFilterFactory() {
+  }
+
+  public static Expression generateFilterExpression(SearchArgument sarg) {
+    List<PredicateLeaf> leaves = sarg.getLeaves();
+    List<ExpressionTree> childNodes = sarg.getExpression().getChildren();
+
+    switch (sarg.getExpression().getOperator()) {
+      case OR:
+        ExpressionTree orLeft = childNodes.get(0);
+        ExpressionTree orRight = childNodes.get(1);
+        return or(translate(orLeft, leaves), translate(orRight, leaves));
+      case AND:
+        ExpressionTree andLeft = childNodes.get(0);
+        ExpressionTree andRight = childNodes.get(1);
+        if (childNodes.size() > 2) {
+          Expression[] evaluatedChildren = getLeftoverLeaves(childNodes, 
leaves);
+          return and(
+                  translate(andLeft, leaves), translate(andRight, leaves), 
evaluatedChildren);
+        } else {
+          return and(translate(andLeft, leaves), translate(andRight, leaves));
+        }
+      case NOT:
+        return not(translateLeaf(sarg.getLeaves().get(0)));
+      case LEAF:
+        return translateLeaf(sarg.getLeaves().get(0));
+      case CONSTANT:
+        return null;
+      default:
+        throw new IllegalStateException("Unknown operator: " + 
sarg.getExpression().getOperator());
+    }
+  }
+
+  /**
+   * Remove first 2 nodes already evaluated and return an array of the 
evaluated leftover nodes.
+   * @param allChildNodes All child nodes to be evaluated for the AND 
expression.
+   * @param leaves All instances of the leaf nodes.
+   * @return Array of leftover evaluated nodes.
+   */
+  private static Expression[] getLeftoverLeaves(List<ExpressionTree> 
allChildNodes, List<PredicateLeaf> leaves) {

Review comment:
       I guess this should be unnecessary if the logic for OR/AND operators is 
simplified to use a for loop as in my suggestion above.

##########
File path: 
mr/src/main/java/org/apache/iceberg/mr/mapred/iterables/SnapshotIterable.java
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred.iterables;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+
+/**
+ * Creates an Iterable of Records with all snapshot metadata that can be used 
with the RecordReader.
+ */
+public class SnapshotIterable implements CloseableIterable {

Review comment:
       Is this really needed? Can we reuse 
https://github.com/apache/iceberg/blob/cad124967ceb740178a3f65b75f19017494b430d/core/src/main/java/org/apache/iceberg/SnapshotsTable.java#L60
 to get the iterable instead?

##########
File path: 
mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergFilterFactory.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.util.List;
+import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+
+import static org.apache.iceberg.expressions.Expressions.and;
+import static org.apache.iceberg.expressions.Expressions.equal;
+import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.in;
+import static org.apache.iceberg.expressions.Expressions.isNull;
+import static org.apache.iceberg.expressions.Expressions.lessThan;
+import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.not;
+import static org.apache.iceberg.expressions.Expressions.notNull;
+import static org.apache.iceberg.expressions.Expressions.or;
+
+public class IcebergFilterFactory {
+
+  private IcebergFilterFactory() {
+  }
+
+  public static Expression generateFilterExpression(SearchArgument sarg) {
+    List<PredicateLeaf> leaves = sarg.getLeaves();
+    List<ExpressionTree> childNodes = sarg.getExpression().getChildren();
+
+    switch (sarg.getExpression().getOperator()) {
+      case OR:
+        ExpressionTree orLeft = childNodes.get(0);
+        ExpressionTree orRight = childNodes.get(1);
+        return or(translate(orLeft, leaves), translate(orRight, leaves));
+      case AND:
+        ExpressionTree andLeft = childNodes.get(0);
+        ExpressionTree andRight = childNodes.get(1);
+        if (childNodes.size() > 2) {
+          Expression[] evaluatedChildren = getLeftoverLeaves(childNodes, 
leaves);
+          return and(
+                  translate(andLeft, leaves), translate(andRight, leaves), 
evaluatedChildren);
+        } else {
+          return and(translate(andLeft, leaves), translate(andRight, leaves));
+        }
+      case NOT:
+        return not(translateLeaf(sarg.getLeaves().get(0)));
+      case LEAF:
+        return translateLeaf(sarg.getLeaves().get(0));
+      case CONSTANT:
+        return null;
+      default:
+        throw new IllegalStateException("Unknown operator: " + 
sarg.getExpression().getOperator());
+    }
+  }
+
+  /**
+   * Remove first 2 nodes already evaluated and return an array of the 
evaluated leftover nodes.
+   * @param allChildNodes All child nodes to be evaluated for the AND 
expression.
+   * @param leaves All instances of the leaf nodes.
+   * @return Array of leftover evaluated nodes.
+   */
+  private static Expression[] getLeftoverLeaves(List<ExpressionTree> 
allChildNodes, List<PredicateLeaf> leaves) {
+    allChildNodes.remove(0);
+    allChildNodes.remove(0);
+
+    Expression[] evaluatedLeaves = new Expression[allChildNodes.size()];
+    for (int i = 0; i < allChildNodes.size(); i++) {
+      Expression filter = translate(allChildNodes.get(i), leaves);
+      evaluatedLeaves[i] = filter;
+    }
+    return evaluatedLeaves;
+  }
+
+  /**
+   * Recursive method to traverse down the ExpressionTree to evaluate each 
expression and its leaf nodes.
+   * @param tree Current ExpressionTree where the 'top' node is being 
evaluated.
+   * @param leaves List of all leaf nodes within the tree.
+   * @return Expression that is translated from the Hive SearchArgument.
+   */
+  private static Expression translate(ExpressionTree tree, List<PredicateLeaf> 
leaves) {
+    switch (tree.getOperator()) {
+      case OR:
+        return or(translate(tree.getChildren().get(0), leaves),
+                translate(tree.getChildren().get(1), leaves));
+      case AND:
+        if (tree.getChildren().size() > 2) {
+          Expression[] evaluatedChildren = 
getLeftoverLeaves(tree.getChildren(), leaves);
+          return and(translate(tree.getChildren().get(0), leaves),
+                  translate(tree.getChildren().get(1), leaves), 
evaluatedChildren);
+        } else {
+          return and(translate(tree.getChildren().get(0), leaves),
+                  translate(tree.getChildren().get(1), leaves));
+        }
+      case NOT:
+        return not(translate(tree.getChildren().get(0), leaves));
+      case LEAF:
+        return translateLeaf(leaves.get(tree.getLeaf()));
+      case CONSTANT:
+        //We are unsure of how the CONSTANT case works, so using the approach 
of:
+        
//https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/
+        // ParquetFilterPredicateConverter.java#L116
+        return null;

Review comment:
       We return `null` here but `Expression.and()`, `Expressions.not()` and 
Expressions.or()` have null checks in them, so seems like this will fail 
regardless? Is it better to throw a more readable exception here instead.

##########
File path: 
mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergFilterFactory.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.util.List;
+import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+
+import static org.apache.iceberg.expressions.Expressions.and;
+import static org.apache.iceberg.expressions.Expressions.equal;
+import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.in;
+import static org.apache.iceberg.expressions.Expressions.isNull;
+import static org.apache.iceberg.expressions.Expressions.lessThan;
+import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.not;
+import static org.apache.iceberg.expressions.Expressions.notNull;
+import static org.apache.iceberg.expressions.Expressions.or;
+
+public class IcebergFilterFactory {
+
+  private IcebergFilterFactory() {
+  }
+
+  public static Expression generateFilterExpression(SearchArgument sarg) {

Review comment:
       A lot of the code is duplicated between this method and `translate()`. 
Should we just call `translate(sarg.getExpression(), sarg.getLeaves())` here?

##########
File path: 
mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergFilterFactory.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.util.List;
+import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+
+import static org.apache.iceberg.expressions.Expressions.and;
+import static org.apache.iceberg.expressions.Expressions.equal;
+import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.in;
+import static org.apache.iceberg.expressions.Expressions.isNull;
+import static org.apache.iceberg.expressions.Expressions.lessThan;
+import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual;
+import static org.apache.iceberg.expressions.Expressions.not;
+import static org.apache.iceberg.expressions.Expressions.notNull;
+import static org.apache.iceberg.expressions.Expressions.or;
+
+public class IcebergFilterFactory {
+
+  private IcebergFilterFactory() {
+  }
+
+  public static Expression generateFilterExpression(SearchArgument sarg) {
+    List<PredicateLeaf> leaves = sarg.getLeaves();
+    List<ExpressionTree> childNodes = sarg.getExpression().getChildren();
+
+    switch (sarg.getExpression().getOperator()) {
+      case OR:
+        ExpressionTree orLeft = childNodes.get(0);
+        ExpressionTree orRight = childNodes.get(1);
+        return or(translate(orLeft, leaves), translate(orRight, leaves));
+      case AND:
+        ExpressionTree andLeft = childNodes.get(0);
+        ExpressionTree andRight = childNodes.get(1);
+        if (childNodes.size() > 2) {
+          Expression[] evaluatedChildren = getLeftoverLeaves(childNodes, 
leaves);
+          return and(
+                  translate(andLeft, leaves), translate(andRight, leaves), 
evaluatedChildren);
+        } else {
+          return and(translate(andLeft, leaves), translate(andRight, leaves));
+        }
+      case NOT:
+        return not(translateLeaf(sarg.getLeaves().get(0)));
+      case LEAF:
+        return translateLeaf(sarg.getLeaves().get(0));
+      case CONSTANT:
+        return null;
+      default:
+        throw new IllegalStateException("Unknown operator: " + 
sarg.getExpression().getOperator());
+    }
+  }
+
+  /**
+   * Remove first 2 nodes already evaluated and return an array of the 
evaluated leftover nodes.
+   * @param allChildNodes All child nodes to be evaluated for the AND 
expression.
+   * @param leaves All instances of the leaf nodes.
+   * @return Array of leftover evaluated nodes.
+   */
+  private static Expression[] getLeftoverLeaves(List<ExpressionTree> 
allChildNodes, List<PredicateLeaf> leaves) {
+    allChildNodes.remove(0);
+    allChildNodes.remove(0);
+
+    Expression[] evaluatedLeaves = new Expression[allChildNodes.size()];
+    for (int i = 0; i < allChildNodes.size(); i++) {
+      Expression filter = translate(allChildNodes.get(i), leaves);
+      evaluatedLeaves[i] = filter;
+    }
+    return evaluatedLeaves;
+  }
+
+  /**
+   * Recursive method to traverse down the ExpressionTree to evaluate each 
expression and its leaf nodes.
+   * @param tree Current ExpressionTree where the 'top' node is being 
evaluated.
+   * @param leaves List of all leaf nodes within the tree.
+   * @return Expression that is translated from the Hive SearchArgument.
+   */
+  private static Expression translate(ExpressionTree tree, List<PredicateLeaf> 
leaves) {
+    switch (tree.getOperator()) {
+      case OR:
+        return or(translate(tree.getChildren().get(0), leaves),
+                translate(tree.getChildren().get(1), leaves));
+      case AND:
+        if (tree.getChildren().size() > 2) {
+          Expression[] evaluatedChildren = 
getLeftoverLeaves(tree.getChildren(), leaves);
+          return and(translate(tree.getChildren().get(0), leaves),
+                  translate(tree.getChildren().get(1), leaves), 
evaluatedChildren);
+        } else {
+          return and(translate(tree.getChildren().get(0), leaves),
+                  translate(tree.getChildren().get(1), leaves));
+        }
+      case NOT:
+        return not(translate(tree.getChildren().get(0), leaves));
+      case LEAF:
+        return translateLeaf(leaves.get(tree.getLeaf()));
+      case CONSTANT:
+        //We are unsure of how the CONSTANT case works, so using the approach 
of:
+        
//https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/
+        // ParquetFilterPredicateConverter.java#L116
+        return null;
+      default:
+        throw new IllegalStateException("Unknown operator: " + 
tree.getOperator());
+    }
+  }
+
+  /**
+   * Translate leaf nodes from Hive operator to Iceberg operator.
+   * @param leaf Leaf node
+   * @return Expression fully translated from Hive PredicateLeaf
+   */
+  private static Expression translateLeaf(PredicateLeaf leaf) {
+    String column = leaf.getColumnName();
+    if (column.equals("snapshot__id")) {

Review comment:
       Reference the `snapshot__id` constant defined in SystemTableUtil

##########
File path: mr/src/main/java/org/apache/iceberg/mr/mapred/TableResolver.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.mr.InputFormatConfig;
+
+final class TableResolver {

Review comment:
       I don't see the need to for having two table resolver classes. Can we 
combine them? The only difference I see is that this class uses 
`org.apache.hadoop.mapred.JobConf` instead of 
`org.apache.hadoop.mapreduce.JobConf`. It seems like we don't use any `JobConf` 
methods, and if we have to, we can create 2 methods instead of 2 classes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to