This is an automated email from the ASF dual-hosted git repository. pvary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 5b30cd8 HIVE-25480: Fix Time Travel with CBO (#2602) (Peter Vary reviewed by Adam Szita) 5b30cd8 is described below commit 5b30cd879b8ef5a4aecbfcaf4366cb8608222909 Author: pvary <pv...@cloudera.com> AuthorDate: Tue Aug 31 10:42:36 2021 +0200 HIVE-25480: Fix Time Travel with CBO (#2602) (Peter Vary reviewed by Adam Szita) --- .../mr/hive/TestHiveIcebergSchemaEvolution.java | 7 +++--- .../org/apache/iceberg/mr/hive/TestHiveShell.java | 2 +- .../apache/hadoop/hive/ql/io/HiveInputFormat.java | 16 +++++++++--- .../org/apache/hadoop/hive/ql/metadata/Table.java | 22 ++++++++++------ .../optimizer/calcite/translator/ASTBuilder.java | 13 ++++++++++ .../hadoop/hive/ql/parse/CalcitePlanner.java | 3 ++- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 11 ++------ .../apache/hadoop/hive/ql/plan/TableScanDesc.java | 29 +++++----------------- 8 files changed, 55 insertions(+), 48 deletions(-) diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSchemaEvolution.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSchemaEvolution.java index 96d7850..9b3c941 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSchemaEvolution.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSchemaEvolution.java @@ -127,7 +127,7 @@ public class TestHiveIcebergSchemaEvolution extends HiveIcebergStorageHandlerWit shell.executeStatement("ALTER TABLE orders CHANGE COLUMN " + "item fruit string"); List<Object[]> result = shell.executeStatement("SELECT customer_first_name, customer_last_name, SUM(quantity) " + - "FROM orders where price >= 3 group by customer_first_name, customer_last_name"); + "FROM orders where price >= 3 group by customer_first_name, customer_last_name order by customer_first_name"); assertQueryResult(result, 4, "Doctor", "Strange", 900L, @@ -140,7 +140,8 @@ public class TestHiveIcebergSchemaEvolution extends HiveIcebergStorageHandlerWit shell.executeStatement("ALTER TABLE orders ADD COLUMNS (nickname string)"); shell.executeStatement("INSERT INTO orders VALUES (7, 'Romanoff', 'Natasha', 3, 250, 'apple', 'Black Widow')"); result = shell.executeStatement("SELECT customer_first_name, customer_last_name, nickname, SUM(quantity) " + - " FROM orders where price >= 3 group by customer_first_name, customer_last_name, nickname"); + " FROM orders where price >= 3 group by customer_first_name, customer_last_name, nickname " + + " order by customer_first_name"); assertQueryResult(result, 5, "Doctor", "Strange", null, 900L, "Natasha", "Romanoff", "Black Widow", 250L, @@ -152,7 +153,7 @@ public class TestHiveIcebergSchemaEvolution extends HiveIcebergStorageHandlerWit shell.executeStatement("ALTER TABLE orders CHANGE COLUMN fruit fruit string AFTER nickname"); result = shell.executeStatement("SELECT customer_first_name, customer_last_name, nickname, fruit, SUM(quantity) " + " FROM orders where price >= 3 and fruit < 'o' group by customer_first_name, customer_last_name, nickname, " + - "fruit"); + "fruit order by customer_first_name"); assertQueryResult(result, 4, "Doctor", "Strange", null, "apple", 100L, "Natasha", "Romanoff", "Black Widow", "apple", 250L, diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java index b3c9440..3d39889 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveShell.java @@ -185,7 +185,7 @@ public class TestHiveShell { hiveConf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_WEBUI_PORT, -1); // Switch off optimizers in order to contain the map reduction within this JVM - hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_CBO_ENABLED, false); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_CBO_ENABLED, true); hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT, false); hiveConf.setBoolVar(HiveConf.ConfVars.HIVEMETADATAONLYQUERIES, false); hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, false); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index f48f65d..372f9af 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -28,6 +28,8 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; +import org.apache.hadoop.hive.common.type.TimestampTZ; +import org.apache.hadoop.hive.common.type.TimestampTZUtil; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; @@ -46,6 +48,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.session.SessionState; @@ -75,6 +78,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.Serializable; +import java.time.ZoneId; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -936,12 +940,16 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> protected static void pushAsOf(Configuration jobConf, TableScanOperator ts) { TableScanDesc scanDesc = ts.getConf(); - if (scanDesc.getAsOfTimestamp() != -1) { - jobConf.set(TableScanDesc.AS_OF_TIMESTAMP, Long.toString(scanDesc.getAsOfTimestamp())); + if (scanDesc.getAsOfTimestamp() != null) { + ZoneId timeZone = SessionState.get() == null ? new HiveConf().getLocalTimeZone() : + SessionState.get().getConf().getLocalTimeZone(); + TimestampTZ time = TimestampTZUtil.parse(PlanUtils.stripQuotes(scanDesc.getAsOfTimestamp()), timeZone); + + jobConf.set(TableScanDesc.AS_OF_TIMESTAMP, Long.toString(time.toEpochMilli())); } - if (scanDesc.getAsOfVersion() != -1) { - jobConf.set(TableScanDesc.AS_OF_VERSION, Long.toString(scanDesc.getAsOfVersion())); + if (scanDesc.getAsOfVersion() != null) { + jobConf.set(TableScanDesc.AS_OF_VERSION, scanDesc.getAsOfVersion()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java index 2ad1d17..c7549c0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java @@ -28,6 +28,7 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.Set; @@ -128,12 +129,13 @@ public class Table implements Serializable { /** * The version of the table. For Iceberg tables this is the snapshotId. */ - private long asOfVersion = -1; + private String asOfVersion = null; /** - * The version of the table at the given timestamp. This is the epoch millisecond. + * The version of the table at the given timestamp. The format will be parsed with + * TimestampTZUtil.parse. */ - private long asOfTimestamp = -1; + private String asOfTimestamp = null; /** * Used only for serialization. @@ -567,6 +569,12 @@ public class Table implements Serializable { } else if (!tTable.equals(other.tTable)) { return false; } + if (!Objects.equals(asOfTimestamp, other.asOfTimestamp)) { + return false; + } + if (!Objects.equals(asOfVersion, other.asOfVersion)) { + return false; + } return true; } @@ -1282,19 +1290,19 @@ public class Table implements Serializable { return result; } - public long getAsOfVersion() { + public String getAsOfVersion() { return asOfVersion; } - public void setAsOfVersion(long asOfVersion) { + public void setAsOfVersion(String asOfVersion) { this.asOfVersion = asOfVersion; } - public long getAsOfTimestamp() { + public String getAsOfTimestamp() { return asOfTimestamp; } - public void setAsOfTimestamp(long asOfTimestamp) { + public void setAsOfTimestamp(String asOfTimestamp) { this.asOfTimestamp = asOfTimestamp; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java index 9fbdedb..264da23 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java @@ -31,6 +31,7 @@ import org.apache.calcite.util.TimeString; import org.apache.calcite.util.TimestampString; import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; +import org.apache.hadoop.hive.common.type.TimestampTZUtil; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; @@ -86,6 +87,18 @@ public class ASTBuilder { ASTBuilder b = ASTBuilder.construct(HiveParser.TOK_TABREF, "TOK_TABREF").add(tableNameBuilder); + if (hTbl.getHiveTableMD().getAsOfTimestamp() != null) { + ASTBuilder asOfBuilder = ASTBuilder.construct(HiveParser.TOK_AS_OF_TIME, "TOK_AS_OF_TIME") + .add(HiveParser.StringLiteral, hTbl.getHiveTableMD().getAsOfTimestamp()); + b.add(asOfBuilder); + } + + if (hTbl.getHiveTableMD().getAsOfVersion() != null) { + ASTBuilder asOfBuilder = ASTBuilder.construct(HiveParser.TOK_AS_OF_VERSION, "TOK_AS_OF_VERSION") + .add(HiveParser.Number, hTbl.getHiveTableMD().getAsOfVersion()); + b.add(asOfBuilder); + } + ASTBuilder propList = ASTBuilder.construct(HiveParser.TOK_TABLEPROPLIST, "TOK_TABLEPROPLIST"); if (scan instanceof DruidQuery) { //Passing query spec, column names and column types to be used as part of Hive Physical execution diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java index 5a034ae..99c8045 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/CalcitePlanner.java @@ -530,7 +530,7 @@ public class CalcitePlanner extends SemanticAnalyzer { List<ASTNode> oldHints = new ArrayList<>(); // Cache the hints before CBO runs and removes them. // Use the hints later in top level QB. - getHintsFromQB(getQB(), oldHints); + getHintsFromQB(getQB(), oldHints); // Note: for now, we don't actually pass the queryForCbo to CBO, because // it accepts qb, not AST, and can also access all the private stuff in @@ -612,6 +612,7 @@ public class CalcitePlanner extends SemanticAnalyzer { if (!doPhase1(newAST, getQB(), ctx_1, null)) { throw new RuntimeException("Couldn't do phase1 on CBO optimized query plan"); } + // unfortunately making prunedPartitions immutable is not possible // here with SemiJoins not all tables are costed in CBO, so their // PartitionList is not evaluated until the run phase. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 1b53e51..385a54a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -2246,15 +2246,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (!Optional.ofNullable(tab.getStorageHandler()).map(HiveStorageHandler::isTimeTravelAllowed).orElse(false)) { throw new SemanticException(ErrorMsg.TIME_TRAVEL_NOT_ALLOWED, alias); } - if (asOf.getLeft() != null) { - tab.setAsOfVersion(Long.parseLong(asOf.getLeft())); - } - if (asOf.getRight() != null) { - ZoneId timeZone = SessionState.get() == null ? new HiveConf().getLocalTimeZone() : - SessionState.get().getConf().getLocalTimeZone(); - TimestampTZ ts = TimestampTZUtil.parse(stripQuotes(asOf.getRight()), timeZone); - tab.setAsOfTimestamp(ts.toEpochMilli()); - } + tab.setAsOfVersion(asOf.getLeft()); + tab.setAsOfTimestamp(asOf.getRight()); } if (tab.isView()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index a91cf56..ced417b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -141,9 +141,9 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD private int numBuckets = -1; - private long asOfVersion = -1; + private String asOfVersion = null; - private long asOfTimestamp = -1; + private String asOfTimestamp = null; public TableScanDesc() { this(null, null); @@ -536,30 +536,13 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD : storageHandler.getOperatorDescProperties(this, opProps); } - @Explain(displayName = "As of version", explainLevels = { Level.EXTENDED }) - public String getAsOfVersionText() { - if (asOfVersion != -1) { - return String.valueOf(asOfVersion); - } else { - return null; - } - } - - public long getAsOfVersion() { + @Explain(displayName = "As of version") + public String getAsOfVersion() { return asOfVersion; } - @Explain(displayName = "As of timestamp", explainLevels = { Level.EXTENDED }) - public String getAsOfTimestampText() { - if (asOfTimestamp != -1) { - DateFormat format = new SimpleDateFormat("dd/MM/yyyy HH:mm:ss"); - return format.format(new Date(asOfTimestamp)); - } else { - return null; - } - } - - public long getAsOfTimestamp() { + @Explain(displayName = "As of timestamp") + public String getAsOfTimestamp() { return asOfTimestamp; }