This is an automated email from the ASF dual-hosted git repository. pvary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 85b98a0 HIVE-25673: Column pruning fix for MR tasks (Peter Vary reviewed by Marton Bod) (#2765) 85b98a0 is described below commit 85b98a011f98952f5c5755c7a0c036b48b2bd17a Author: pvary <pv...@cloudera.com> AuthorDate: Mon Nov 8 10:56:06 2021 +0100 HIVE-25673: Column pruning fix for MR tasks (Peter Vary reviewed by Marton Bod) (#2765) --- .../iceberg/mr/hive/TestHiveIcebergSelects.java | 26 ++++++++++++++++++++++ .../org/apache/iceberg/mr/hive/TestTables.java | 2 +- .../apache/hadoop/hive/ql/exec/MapOperator.java | 16 +++++++------ 3 files changed, 36 insertions(+), 8 deletions(-) diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java index 84e8b57..96e5dc2 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSelects.java @@ -33,6 +33,7 @@ import org.apache.iceberg.types.Types; import org.junit.Assert; import org.junit.Test; +import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; /** @@ -203,4 +204,29 @@ public class TestHiveIcebergSelects extends HiveIcebergStorageHandlerWithEngineB Assert.assertArrayEquals(new Object[] {0L, "Alice", "Brown"}, rows.get(0)); Assert.assertArrayEquals(new Object[] {1L, "Bob", "Green"}, rows.get(1)); } + + /** + * Column pruning could become problematic when a single Map Task contains multiple TableScan operators where + * different columns are pruned. This only occurs on MR, as Tez initializes a single Map task for every TableScan + * operator. + */ + @Test + public void testMultiColumnPruning() throws IOException { + shell.setHiveSessionValue("hive.cbo.enable", true); + + Schema schema1 = new Schema(optional(1, "fk", Types.StringType.get())); + List<Record> records1 = TestHelper.RecordsBuilder.newInstance(schema1).add("fk1").build(); + testTables.createTable(shell, "table1", schema1, fileFormat, records1); + + Schema schema2 = new Schema(optional(1, "fk", Types.StringType.get()), optional(2, "val", Types.StringType.get())); + List<Record> records2 = TestHelper.RecordsBuilder.newInstance(schema2).add("fk1", "val").build(); + testTables.createTable(shell, "table2", schema2, fileFormat, records2); + + // MR is needed for the reproduction + shell.setHiveSessionValue("hive.execution.engine", "mr"); + String query = "SELECT t2.val FROM table1 t1 JOIN table2 t2 ON t1.fk = t2.fk"; + List<Object[]> result = shell.executeStatement(query); + Assert.assertEquals(1, result.size()); + Assert.assertArrayEquals(new Object[]{"val"}, result.get(0)); + } } diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java index 5a6f38c..85ba748 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java @@ -437,7 +437,7 @@ abstract class TestTables { } Assert.assertTrue(location.delete()); - return location.toString(); + return "file://" + location; } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index ea8e634..358dbbb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -37,7 +37,6 @@ import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.RecordIdentifier; -import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -340,10 +339,10 @@ public class MapOperator extends AbstractMapOperator { /** * For each source table, combine the nested column pruning information from all its * table scan descriptors and set it in a configuration copy. This is necessary since - * the configuration property "READ_NESTED_COLUMN_PATH_CONF_STR" is set on a per-table - * basis, so we can't just use a single configuration for all the tables. + * the configuration properties are set on a per-table basis, so we can't just use a + * single configuration for all the tables. */ - private Map<String, Configuration> cloneConfsForNestedColPruning(Configuration hconf) { + private Map<String, Configuration> cloneConfsForColPruning(Configuration hconf) { Map<String, Configuration> tableNameToConf = new HashMap<>(); for (Map.Entry<Path, List<String>> e : conf.getPathToAliases().entrySet()) { @@ -369,10 +368,13 @@ public class MapOperator extends AbstractMapOperator { if (!tableNameToConf.containsKey(tableName)) { Configuration clonedConf = new Configuration(hconf); clonedConf.unset(ColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR); + clonedConf.unset(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR); + clonedConf.unset(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR); tableNameToConf.put(tableName, clonedConf); } Configuration newConf = tableNameToConf.get(tableName); - ColumnProjectionUtils.appendNestedColumnPaths(newConf, nestedColumnPaths); + ColumnProjectionUtils.appendReadColumns(newConf, tableScanDesc.getNeededColumnIDs(), + tableScanDesc.getOutputColumnNames(), tableScanDesc.getNeededNestedColumnPaths()); } } @@ -403,7 +405,7 @@ public class MapOperator extends AbstractMapOperator { throws SerDeException, Exception { setChildOperators(children); - Map<String, Configuration> tableNameToConf = cloneConfsForNestedColPruning(hconf); + Map<String, Configuration> tableNameToConf = cloneConfsForColPruning(hconf); for (Operator<?> child : children) { TableScanOperator tsOp = (TableScanOperator) child; @@ -426,7 +428,7 @@ public class MapOperator extends AbstractMapOperator { List<Operator<? extends OperatorDesc>> children = new ArrayList<Operator<? extends OperatorDesc>>(); - Map<String, Configuration> tableNameToConf = cloneConfsForNestedColPruning(hconf); + Map<String, Configuration> tableNameToConf = cloneConfsForColPruning(hconf); Map<TableDesc, StructObjectInspector> convertedOI = getConvertedOI(tableNameToConf); for (Map.Entry<Path, List<String>> entry : conf.getPathToAliases().entrySet()) {