This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new f56bf324c80 HIVE-28763: Iceberg: Support functions while expiring
snapshots. (#5643). (Ayush Saxena, reviewed by Shohei Okumiya)
f56bf324c80 is described below
commit f56bf324c807eaa2ab5912cc69036d190cc9af4b
Author: Ayush Saxena <[email protected]>
AuthorDate: Tue Feb 18 22:34:18 2025 +0530
HIVE-28763: Iceberg: Support functions while expiring snapshots. (#5643).
(Ayush Saxena, reviewed by Shohei Okumiya)
---
.../mr/hive/TestHiveIcebergExpireSnapshots.java | 43 ++++++++++++++++++++++
.../hadoop/hive/ql/parse/AlterClauseParser.g | 10 ++++-
.../table/execute/AlterTableExecuteAnalyzer.java | 31 +++++++++++++---
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 2 +-
4 files changed, 77 insertions(+), 9 deletions(-)
diff --git
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java
index 9f036a5615a..ac8eeeb71b2 100644
---
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java
+++
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergExpireSnapshots.java
@@ -27,6 +27,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -146,6 +147,48 @@ public void testExpireSnapshotsWithDefaultParams() throws
IOException, Interrupt
}
+ @Test
+ public void testExpireSnapshotsWithFunction() throws IOException,
InterruptedException {
+ TableIdentifier identifier = TableIdentifier.of("default", "source");
+ Table table =
+ testTables.createTableWithVersions(shell, identifier.name(),
+ HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, fileFormat,
+ HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 5);
+ Assert.assertEquals(5, table.history().size());
+ shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE
EXPIRE_SNAPSHOTS(DATE('1985-10-10'))");
+ table.refresh();
+ Assert.assertEquals(5, table.history().size());
+ shell.executeStatement(
+ "ALTER TABLE " + identifier.name() + " EXECUTE
EXPIRE_SNAPSHOTS(TIMESTAMP('1987-10-10 10:15:23.386'))");
+ table.refresh();
+ Assert.assertEquals(5, table.history().size());
+ shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE
EXPIRE_SNAPSHOTS(CURRENT_DATE + 5)");
+ table.refresh();
+ Assert.assertEquals(1, table.history().size());
+ testTables.appendIcebergTable(shell.getHiveConf(), table, fileFormat, null,
+ HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+ table.refresh();
+ Assert.assertEquals(2, table.history().size());
+ shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE
EXPIRE_SNAPSHOTS(CURRENT_TIMESTAMP)");
+ table.refresh();
+ Assert.assertEquals(1, table.history().size());
+
+ // Test with between keyword
+ testTables.appendIcebergTable(shell.getHiveConf(), table, fileFormat, null,
+ HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+ table.refresh();
+ Assert.assertEquals(2, table.history().size());
+ SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd
HH:mm:ss.SSS000000");
+ String toTime = simpleDateFormat.format(new
Date(table.history().get(0).timestampMillis()));
+ shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE
EXPIRE_SNAPSHOTS BETWEEN " +
+ "(CURRENT_DATE - 1) AND '" + toTime + "'");
+ table.refresh();
+ Assert.assertEquals(1, IterableUtils.size(table.snapshots()));
+ AssertHelpers.assertThrows("Invalid timestamp expression",
IllegalArgumentException.class, () ->
+ shell.executeStatement("ALTER TABLE " + identifier.name() + " EXECUTE
EXPIRE_SNAPSHOTS BETWEEN " +
+ "(RAND()) AND '" + toTime + "'"));
+ }
+
@Test
public void testDeleteOrphanFiles() throws IOException, InterruptedException
{
TableIdentifier identifier = TableIdentifier.of("default", "source");
diff --git
a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
index b896e3d35b1..5c4fb550d03 100644
--- a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
+++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
@@ -493,7 +493,7 @@ alterStatementSuffixExecute
@after { gParent.popMsg(state); }
: KW_EXECUTE KW_ROLLBACK LPAREN (rollbackParam=(StringLiteral | Number))
RPAREN
-> ^(TOK_ALTERTABLE_EXECUTE KW_ROLLBACK $rollbackParam)
- | KW_EXECUTE KW_EXPIRE_SNAPSHOTS (LPAREN (expireParam=StringLiteral)
RPAREN)?
+ | KW_EXECUTE KW_EXPIRE_SNAPSHOTS (LPAREN (expireParam=expression) RPAREN)?
-> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS $expireParam?)
| KW_EXECUTE KW_SET_CURRENT_SNAPSHOT LPAREN (snapshotParam=expression)
RPAREN
-> ^(TOK_ALTERTABLE_EXECUTE KW_SET_CURRENT_SNAPSHOT $snapshotParam)
@@ -501,7 +501,8 @@ alterStatementSuffixExecute
-> ^(TOK_ALTERTABLE_EXECUTE KW_FAST_FORWARD $sourceBranch $targetBranch?)
| KW_EXECUTE KW_CHERRY_PICK snapshotId=Number
-> ^(TOK_ALTERTABLE_EXECUTE KW_CHERRY_PICK $snapshotId)
- | KW_EXECUTE KW_EXPIRE_SNAPSHOTS KW_BETWEEN (fromTimestamp=StringLiteral)
KW_AND (toTimestamp=StringLiteral)
+ | KW_EXECUTE KW_EXPIRE_SNAPSHOTS KW_BETWEEN
+ fromTimestamp=timestampExpression KW_AND toTimestamp=timestampExpression
-> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS $fromTimestamp
$toTimestamp)
| KW_EXECUTE KW_EXPIRE_SNAPSHOTS KW_RETAIN KW_LAST numToRetain=Number
-> ^(TOK_ALTERTABLE_EXECUTE KW_EXPIRE_SNAPSHOTS KW_RETAIN $numToRetain)
@@ -509,6 +510,11 @@ alterStatementSuffixExecute
-> ^(TOK_ALTERTABLE_EXECUTE KW_ORPHAN_FILES $timestamp?)
;
+timestampExpression
+ : StringLiteral -> StringLiteral
+ | LPAREN expression RPAREN -> expression
+ ;
+
alterStatementSuffixRenameBranch
@init { gParent.pushMsg("alter table rename branch", state); }
@after { gParent.popMsg(state); }
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
index 275a0e1a4c5..96b2bdac748 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/execute/AlterTableExecuteAnalyzer.java
@@ -39,7 +39,12 @@
import
org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.ExpireSnapshotsSpec;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.FastForwardSpec;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec.RollbackSpec;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -85,7 +90,7 @@ protected void analyzeCommand(TableName tableName,
Map<String, String> partition
desc = getRollbackDesc(tableName, partitionSpec, (ASTNode)
command.getChild(1));
break;
case HiveParser.KW_EXPIRE_SNAPSHOTS:
- desc = getExpireSnapshotDesc(tableName, partitionSpec,
command.getChildren());
+ desc = getExpireSnapshotDesc(tableName, partitionSpec,
command.getChildren(), queryState.getConf());
break;
case HiveParser.KW_SET_CURRENT_SNAPSHOT:
desc = getSetCurrentSnapshotDesc(tableName, partitionSpec, (ASTNode)
command.getChild(1));
@@ -98,7 +103,7 @@ protected void analyzeCommand(TableName tableName,
Map<String, String> partition
break;
case HiveParser.KW_ORPHAN_FILES:
desc = getDeleteOrphanFilesDesc(tableName, partitionSpec,
command.getChildren());
- break;
+ break;
}
rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
desc)));
@@ -139,7 +144,7 @@ private static AlterTableExecuteDesc
getSetCurrentSnapshotDesc(TableName tableNa
}
private static AlterTableExecuteDesc getExpireSnapshotDesc(TableName
tableName, Map<String, String> partitionSpec,
- List<Node> children) throws SemanticException {
+ List<Node> children, HiveConf conf) throws SemanticException {
AlterTableExecuteSpec<ExpireSnapshotsSpec> spec;
if (children.size() == 1) {
spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, null);
@@ -158,19 +163,33 @@ private static AlterTableExecuteDesc
getExpireSnapshotDesc(TableName tableName,
} else if (children.size() == 3) {
ASTNode secondNode = (ASTNode) children.get(2);
String secondNodeText =
PlanUtils.stripQuotes(secondNode.getText().trim());
- TimestampTZ fromTime = TimestampTZUtil.parse(firstNodeText, timeZone);
- TimestampTZ toTime = TimestampTZUtil.parse(secondNodeText, timeZone);
+ TimestampTZ fromTime = TimestampTZUtil.parse(getTimeStampString(conf,
firstNode, firstNodeText), timeZone);
+ TimestampTZ toTime = TimestampTZUtil.parse(getTimeStampString(conf,
secondNode, secondNodeText), timeZone);
spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT,
new ExpireSnapshotsSpec(fromTime.toEpochMilli(),
toTime.toEpochMilli()));
} else if (EXPIRE_SNAPSHOT_BY_ID_REGEX.matcher(firstNodeText).matches()) {
spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, new
ExpireSnapshotsSpec(firstNodeText));
} else {
- TimestampTZ time = TimestampTZUtil.parse(firstNodeText, timeZone);
+ TimestampTZ time = TimestampTZUtil.parse(getTimeStampString(conf,
firstNode, firstNodeText), timeZone);
spec = new AlterTableExecuteSpec(EXPIRE_SNAPSHOT, new
ExpireSnapshotsSpec(time.toEpochMilli()));
}
return new AlterTableExecuteDesc(tableName, partitionSpec, spec);
}
+ private static String getTimeStampString(HiveConf conf, ASTNode node, String
nodeText) throws SemanticException {
+ if (node.getChildCount() > 0) {
+ QueryState queryState = new
QueryState.Builder().withGenerateNewQueryId(false).withHiveConf(conf).build();
+ SemanticAnalyzer sem = (SemanticAnalyzer)
SemanticAnalyzerFactory.get(queryState, node);
+ ExprNodeDesc desc = sem.genExprNodeDesc(node, new RowResolver(), false,
true);
+ if(!(desc instanceof ExprNodeConstantDesc)) {
+ throw new SemanticException("Invalid timestamp expression");
+ }
+ ExprNodeConstantDesc constantDesc = (ExprNodeConstantDesc) desc;
+ return String.valueOf(constantDesc.getValue());
+ }
+ return nodeText;
+ }
+
private static AlterTableExecuteDesc getRollbackDesc(TableName tableName,
Map<String, String> partitionSpec,
ASTNode childNode) throws SemanticException {
AlterTableExecuteSpec<RollbackSpec> spec;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 26267654a4b..e8f0bd380e2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -13533,7 +13533,7 @@ public ExprNodeDesc genExprNodeDesc(ASTNode expr,
RowResolver input)
return genExprNodeDesc(expr, input, true, false);
}
- ExprNodeDesc genExprNodeDesc(ASTNode expr, RowResolver input, boolean
useCaching,
+ public ExprNodeDesc genExprNodeDesc(ASTNode expr, RowResolver input, boolean
useCaching,
boolean foldExpr) throws
SemanticException {
TypeCheckCtx tcCtx = new TypeCheckCtx(input, useCaching, foldExpr);
return genExprNodeDesc(expr, input, tcCtx);