This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new bb3592c52e6 HIVE-27503: Iceberg: Support query iceberg tag (Butao Zhang, reviewed by Denys Kuzmenko) bb3592c52e6 is described below commit bb3592c52e6df6c97d0d84373ab85d1eda5d0c23 Author: Butao Zhang <zhangbu...@cmss.chinamobile.com> AuthorDate: Mon Jul 24 15:23:15 2023 +0800 HIVE-27503: Iceberg: Support query iceberg tag (Butao Zhang, reviewed by Denys Kuzmenko) Closes #4482 --- .../main/java/org/apache/iceberg/mr/Catalogs.java | 2 +- .../org/apache/iceberg/mr/InputFormatConfig.java | 2 +- .../iceberg/mr/hive/HiveIcebergInputFormat.java | 2 +- .../mr/hive/HiveIcebergOutputCommitter.java | 10 +- .../iceberg/mr/hive/HiveIcebergStorageHandler.java | 16 +-- .../iceberg/mr/mapreduce/IcebergInputFormat.java | 6 +- .../mr/hive/TestHiveIcebergTagOperation.java | 42 ++++++++ .../src/test/queries/positive/query_iceberg_tag.q | 20 ++++ .../negative/write_iceberg_branch_negative.q.out | 2 +- .../test/results/positive/query_iceberg_tag.q.out | 118 +++++++++++++++++++++ .../results/positive/write_iceberg_branch.q.out | 10 +- .../org/apache/hadoop/hive/ql/exec/Utilities.java | 6 +- .../apache/hadoop/hive/ql/io/HiveInputFormat.java | 4 +- .../apache/hadoop/hive/ql/metadata/HiveUtils.java | 16 ++- .../org/apache/hadoop/hive/ql/metadata/Table.java | 12 +-- .../optimizer/calcite/translator/ASTBuilder.java | 5 +- .../hive/ql/optimizer/ppr/PartitionPruner.java | 4 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 20 ++-- .../apache/hadoop/hive/ql/plan/TableScanDesc.java | 14 +-- .../org/apache/hadoop/hive/common/TableName.java | 5 +- 20 files changed, 254 insertions(+), 62 deletions(-) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java index d422885becd..58dae8317d9 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/Catalogs.java @@ -66,7 +66,7 @@ public final class Catalogs { public static final String NAME = "name"; public static final String LOCATION = "location"; - public static final String BRANCH_NAME = "branch_name"; + public static final String SNAPSHOT_REF = "snapshot_ref"; private static final String NO_CATALOG_TYPE = "no catalog"; private static final Set<String> PROPERTIES_TO_REMOVE = diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java index cf3450840a8..eb212766c7c 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java @@ -62,7 +62,7 @@ public class InputFormatConfig { public static final boolean CONFIG_SERIALIZATION_DISABLED_DEFAULT = true; public static final String OPERATION_TYPE_PREFIX = "iceberg.mr.operation.type."; public static final String OUTPUT_TABLES = "iceberg.mr.output.tables"; - public static final String OUTPUT_TABLE_BRANCH = "iceberg.mr.output.table.branch"; + public static final String OUTPUT_TABLE_SNAPSHOT_REF = "iceberg.mr.output.table.snapshot.ref"; public static final String COMMIT_TABLE_THREAD_POOL_SIZE = "iceberg.mr.commit.table.thread.pool.size"; public static final int COMMIT_TABLE_THREAD_POOL_SIZE_DEFAULT = 10; public static final String COMMIT_FILE_THREAD_POOL_SIZE = "iceberg.mr.commit.file.thread.pool.size"; diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java index 66883b02e5c..242d8d87e55 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java @@ -143,7 +143,7 @@ public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record> job.set(InputFormatConfig.AS_OF_TIMESTAMP, job.get(TableScanDesc.AS_OF_TIMESTAMP, "-1")); job.set(InputFormatConfig.SNAPSHOT_ID, job.get(TableScanDesc.AS_OF_VERSION, "-1")); job.set(InputFormatConfig.SNAPSHOT_ID_INTERVAL_FROM, job.get(TableScanDesc.FROM_VERSION, "-1")); - job.set(InputFormatConfig.OUTPUT_TABLE_BRANCH, job.get(TableScanDesc.BRANCH_NAME, "")); + job.set(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF, job.get(TableScanDesc.SNAPSHOT_REF, "")); String location = job.get(InputFormatConfig.TABLE_LOCATION); return Arrays.stream(super.getSplits(job, numSplits)) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index 7a5a8e69652..1ac8a3225ec 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -433,7 +433,7 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { FilesForCommit writeResults = collectResults( numTasks, executor, outputTable.table.location(), jobContext, io, true); - String branchName = conf.get(InputFormatConfig.OUTPUT_TABLE_BRANCH); + String branchName = conf.get(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF); if (!conf.getBoolean(InputFormatConfig.IS_OVERWRITE, false)) { if (writeResults.isEmpty()) { LOG.info( @@ -459,7 +459,7 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { AppendFiles write = table.newAppend(); results.dataFiles().forEach(write::appendFile); if (StringUtils.isNotEmpty(branchName)) { - write.toBranch(HiveUtils.getTableBranch(branchName)); + write.toBranch(HiveUtils.getTableSnapshotRef(branchName)); } write.commit(); } else { @@ -467,7 +467,7 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { results.dataFiles().forEach(write::addRows); results.deleteFiles().forEach(write::addDeletes); if (StringUtils.isNotEmpty(branchName)) { - write.toBranch(HiveUtils.getTableBranch(branchName)); + write.toBranch(HiveUtils.getTableSnapshotRef(branchName)); } write.commit(); } @@ -493,7 +493,7 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { ReplacePartitions overwrite = table.newReplacePartitions(); results.dataFiles().forEach(overwrite::addFile); if (StringUtils.isNotEmpty(branchName)) { - overwrite.toBranch(HiveUtils.getTableBranch(branchName)); + overwrite.toBranch(HiveUtils.getTableSnapshotRef(branchName)); } overwrite.commit(); LOG.info("Overwrite commit took {} ms for table: {} with {} file(s)", System.currentTimeMillis() - startTime, @@ -502,7 +502,7 @@ public class HiveIcebergOutputCommitter extends OutputCommitter { DeleteFiles deleteFiles = table.newDelete(); deleteFiles.deleteFromRowFilter(Expressions.alwaysTrue()); if (StringUtils.isNotEmpty(branchName)) { - deleteFiles.toBranch(HiveUtils.getTableBranch(branchName)); + deleteFiles.toBranch(HiveUtils.getTableSnapshotRef(branchName)); } deleteFiles.commit(); LOG.info("Cleared table contents as part of empty overwrite for unpartitioned table. " + diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index 364d63c3a20..04660667540 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -682,12 +682,12 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H public void storageHandlerCommit(Properties commitProperties, boolean overwrite) throws HiveException { String tableName = commitProperties.getProperty(Catalogs.NAME); String location = commitProperties.getProperty(Catalogs.LOCATION); - String branchName = commitProperties.getProperty(Catalogs.BRANCH_NAME); + String snapshotRef = commitProperties.getProperty(Catalogs.SNAPSHOT_REF); Configuration configuration = SessionState.getSessionConf(); if (location != null) { HiveTableUtil.cleanupTableObjectFile(location, configuration); } - List<JobContext> jobContextList = generateJobContext(configuration, tableName, branchName, overwrite); + List<JobContext> jobContextList = generateJobContext(configuration, tableName, snapshotRef, overwrite); if (jobContextList.isEmpty()) { return; } @@ -856,14 +856,14 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H @Override public org.apache.hadoop.hive.ql.metadata.Table checkAndSetTableMetaRef( org.apache.hadoop.hive.ql.metadata.Table hmsTable, String tableMetaRef) throws SemanticException { - String branch = HiveUtils.getTableBranch(tableMetaRef); - if (branch != null) { + String refName = HiveUtils.getTableSnapshotRef(tableMetaRef); + if (refName != null) { Table tbl = IcebergTableUtil.getTable(conf, hmsTable.getTTable()); - if (tbl.snapshot(branch) != null) { - hmsTable.setBranchName(tableMetaRef); + if (tbl.snapshot(refName) != null) { + hmsTable.setSnapshotRef(tableMetaRef); return hmsTable; } - throw new SemanticException(String.format("Cannot use branch (does not exist): %s", branch)); + throw new SemanticException(String.format("Cannot use snapshotRef (does not exist): %s", refName)); } if (IcebergMetadataTables.isValidMetaTable(tableMetaRef)) { hmsTable.setMetaTable(tableMetaRef); @@ -1375,7 +1375,7 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H // for multi-table inserts, this hook method will be called sequentially for each target table jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName); if (branchName != null) { - jobConf.set(InputFormatConfig.OUTPUT_TABLE_BRANCH, branchName); + jobConf.set(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF, branchName); } jobContextList.add(new JobContextImpl(jobConf, jobID, null)); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java index af62c0514e2..e438879c1f7 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java @@ -128,9 +128,9 @@ public class IcebergInputFormat<T> extends InputFormat<Void, T> { } snapshotId = ref.snapshotId(); } - String branchName = conf.get(InputFormatConfig.OUTPUT_TABLE_BRANCH); - if (StringUtils.isNotEmpty(branchName)) { - scan = scan.useRef(HiveUtils.getTableBranch(branchName)); + String refName = conf.get(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF); + if (StringUtils.isNotEmpty(refName)) { + scan = scan.useRef(HiveUtils.getTableSnapshotRef(refName)); } if (snapshotId != -1) { scan = scan.useSnapshot(snapshotId); diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergTagOperation.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergTagOperation.java index 23a7be72f1f..8e7148c8609 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergTagOperation.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergTagOperation.java @@ -20,6 +20,7 @@ package org.apache.iceberg.mr.hive; import java.io.IOException; +import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.iceberg.SnapshotRef; import org.apache.iceberg.Table; @@ -132,4 +133,45 @@ public class TestHiveIcebergTagOperation extends HiveIcebergStorageHandlerWithEn Assert.assertTrue(e.getMessage().contains("Not an iceberg table")); } } + + @Test + public void testQueryIcebergTag() throws IOException, InterruptedException { + Table table = testTables.createTableWithVersions(shell, "customers", + HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, + fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2); + + long firstSnapshotId = table.history().get(0).snapshotId(); + table.manageSnapshots().createTag("testtag", firstSnapshotId).commit(); + List<Object[]> rows = + shell.executeStatement("SELECT * FROM default.customers.tag_testtag"); + + Assert.assertEquals(3, rows.size()); + + try { + shell.executeStatement("insert into default.customers.tag_testtag values (0L, \"Alice\", \"Brown\")"); + } catch (Throwable e) { + while (e.getCause() != null) { + e = e.getCause(); + } + Assert.assertTrue(e.getMessage().contains("Don't support write (insert/delete/update/merge) to iceberg tag")); + } + + try { + shell.executeStatement("delete from default.customers.tag_testtag where customer_id=0L"); + } catch (Throwable e) { + while (e.getCause() != null) { + e = e.getCause(); + } + Assert.assertTrue(e.getMessage().contains("Don't support write (insert/delete/update/merge) to iceberg tag")); + } + + try { + shell.executeStatement("update default.customers.tag_testtag set customer_id=0L where customer_id=0L"); + } catch (Throwable e) { + while (e.getCause() != null) { + e = e.getCause(); + } + Assert.assertTrue(e.getMessage().contains("Don't support write (insert/delete/update/merge) to iceberg tag")); + } + } } diff --git a/iceberg/iceberg-handler/src/test/queries/positive/query_iceberg_tag.q b/iceberg/iceberg-handler/src/test/queries/positive/query_iceberg_tag.q new file mode 100644 index 00000000000..6ed25bbe842 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/queries/positive/query_iceberg_tag.q @@ -0,0 +1,20 @@ +-- SORT_QUERY_RESULTS +set hive.explain.user=false; +set hive.fetch.task.conversion=more; + +create external table ice01(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2'); +insert into ice01 values (1, 'one', 50), (2, 'two', 51), (111, 'one', 55); + +-- create a tag named test1 +alter table ice01 create tag test1; + +-- query tag using table identifier: db.tbl.tag_tagName +explain select * from default.ice01.tag_test1; +select * from default.ice01.tag_test1; + +-- query tag using non-fetch task +set hive.fetch.task.conversion=none; +explain select * from default.ice01.tag_test1; +select * from default.ice01.tag_test1; + +drop table ice01; \ No newline at end of file diff --git a/iceberg/iceberg-handler/src/test/results/negative/write_iceberg_branch_negative.q.out b/iceberg/iceberg-handler/src/test/results/negative/write_iceberg_branch_negative.q.out index 0d76fc3030b..f24ab56e08c 100644 --- a/iceberg/iceberg-handler/src/test/results/negative/write_iceberg_branch_negative.q.out +++ b/iceberg/iceberg-handler/src/test/results/negative/write_iceberg_branch_negative.q.out @@ -6,4 +6,4 @@ POSTHOOK: query: create external table ice01(a int, b string, c int) stored by i POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@ice01 -FAILED: SemanticException Cannot use branch (does not exist): test1 +FAILED: SemanticException Cannot use snapshotRef (does not exist): test1 diff --git a/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_tag.q.out b/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_tag.q.out new file mode 100644 index 00000000000..dae649f023e --- /dev/null +++ b/iceberg/iceberg-handler/src/test/results/positive/query_iceberg_tag.q.out @@ -0,0 +1,118 @@ +PREHOOK: query: create external table ice01(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@ice01 +POSTHOOK: query: create external table ice01(a int, b string, c int) stored by iceberg stored as orc tblproperties ('format-version'='2') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@ice01 +PREHOOK: query: insert into ice01 values (1, 'one', 50), (2, 'two', 51), (111, 'one', 55) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice01 +POSTHOOK: query: insert into ice01 values (1, 'one', 50), (2, 'two', 51), (111, 'one', 55) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice01 +PREHOOK: query: alter table ice01 create tag test1 +PREHOOK: type: ALTERTABLE_CREATETAG +PREHOOK: Input: default@ice01 +POSTHOOK: query: alter table ice01 create tag test1 +POSTHOOK: type: ALTERTABLE_CREATETAG +POSTHOOK: Input: default@ice01 +PREHOOK: query: explain select * from default.ice01.tag_test1 +PREHOOK: type: QUERY +PREHOOK: Input: default@ice01 +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: explain select * from default.ice01.tag_test1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice01 +POSTHOOK: Output: hdfs://### HDFS PATH ### +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + TableScan + alias: ice01 + Snapshot ref: tag_test1 + Select Operator + expressions: a (type: int), b (type: string), c (type: int) + outputColumnNames: _col0, _col1, _col2 + ListSink + +PREHOOK: query: select * from default.ice01.tag_test1 +PREHOOK: type: QUERY +PREHOOK: Input: default@ice01 +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select * from default.ice01.tag_test1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice01 +POSTHOOK: Output: hdfs://### HDFS PATH ### +1 one 50 +111 one 55 +2 two 51 +PREHOOK: query: explain select * from default.ice01.tag_test1 +PREHOOK: type: QUERY +PREHOOK: Input: default@ice01 +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: explain select * from default.ice01.tag_test1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice01 +POSTHOOK: Output: hdfs://### HDFS PATH ### +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: ice01 + Snapshot ref: tag_test1 + Statistics: Num rows: 3 Data size: 285 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: a (type: int), b (type: string), c (type: int) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 3 Data size: 285 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 3 Data size: 285 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Execution mode: vectorized + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select * from default.ice01.tag_test1 +PREHOOK: type: QUERY +PREHOOK: Input: default@ice01 +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select * from default.ice01.tag_test1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@ice01 +POSTHOOK: Output: hdfs://### HDFS PATH ### +1 one 50 +111 one 55 +2 two 51 +PREHOOK: query: drop table ice01 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@ice01 +PREHOOK: Output: default@ice01 +POSTHOOK: query: drop table ice01 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@ice01 +POSTHOOK: Output: default@ice01 diff --git a/iceberg/iceberg-handler/src/test/results/positive/write_iceberg_branch.q.out b/iceberg/iceberg-handler/src/test/results/positive/write_iceberg_branch.q.out index 4d99ee27029..45f097b0551 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/write_iceberg_branch.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/write_iceberg_branch.q.out @@ -57,7 +57,7 @@ STAGE PLANS: Processor Tree: TableScan alias: ice01 - branch name: branch_test1 + Snapshot ref: branch_test1 Select Operator expressions: a (type: int), b (type: string), c (type: int) outputColumnNames: _col0, _col1, _col2 @@ -235,8 +235,8 @@ STAGE PLANS: Map Operator Tree: TableScan alias: ice01 - branch name: branch_test1 filterExpr: (a = 22) (type: boolean) + Snapshot ref: branch_test1 Statistics: Num rows: 3 Data size: 291 Basic stats: COMPLETE Column stats: COMPLETE Filter Operator predicate: (a = 22) (type: boolean) @@ -333,8 +333,8 @@ STAGE PLANS: Map Operator Tree: TableScan alias: ice01 - branch name: branch_test1 filterExpr: (c = 66) (type: boolean) + Snapshot ref: branch_test1 Statistics: Num rows: 3 Data size: 291 Basic stats: COMPLETE Column stats: COMPLETE Filter Operator predicate: (c = 66) (type: boolean) @@ -483,8 +483,8 @@ STAGE PLANS: Map Operator Tree: TableScan alias: ice01 - branch name: branch_test1 filterExpr: a is not null (type: boolean) + Snapshot ref: branch_test1 Statistics: Num rows: 3 Data size: 291 Basic stats: COMPLETE Column stats: COMPLETE Filter Operator predicate: a is not null (type: boolean) @@ -854,7 +854,7 @@ STAGE PLANS: Map Operator Tree: TableScan alias: ice01 - branch name: branch_test1 + Snapshot ref: branch_test1 Statistics: Num rows: 3 Data size: 285 Basic stats: COMPLETE Column stats: COMPLETE Select Operator expressions: a (type: int), b (type: string), c (type: int) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index d9c726da122..524af17fd0a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -257,7 +257,7 @@ public final class Utilities { public static final String MAPNAME = "Map "; public static final String REDUCENAME = "Reducer "; public static final String ENSURE_OPERATORS_EXECUTED = "ENSURE_OPERATORS_EXECUTED"; - public static final String BRANCH_NAME = "branch_name"; + public static final String SNAPSHOT_REF = "snapshot_ref"; @Deprecated protected static final String DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX = "mapred.dfsclient.parallelism.max"; @@ -764,8 +764,8 @@ public final class Utilities { if (tbl.getMetaTable() != null) { props.put("metaTable", tbl.getMetaTable()); } - if (tbl.getBranchName() != null) { - props.put(BRANCH_NAME, tbl.getBranchName()); + if (tbl.getSnapshotRef() != null) { + props.put(SNAPSHOT_REF, tbl.getSnapshotRef()); } return (new TableDesc(tbl.getInputFormatClass(), tbl .getOutputFormatClass(), props)); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 3a95f4769d9..987411401e4 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -998,8 +998,8 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> jobConf.set(TableScanDesc.FROM_VERSION, scanDesc.getVersionIntervalFrom()); } - if (scanDesc.getBranchName() != null) { - jobConf.set(TableScanDesc.BRANCH_NAME, scanDesc.getBranchName()); + if (scanDesc.getSnapshotRef() != null) { + jobConf.set(TableScanDesc.SNAPSHOT_REF, scanDesc.getSnapshotRef()); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java index c91ae9ede7c..8009edca22c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java @@ -109,7 +109,8 @@ public final class HiveUtils { static final byte[] tabEscapeBytes = "\\t".getBytes();; static final byte[] tabUnescapeBytes = "\t".getBytes(); static final byte[] ctrlABytes = "\u0001".getBytes(); - static final Pattern BRANCH = Pattern.compile("branch_(.*)"); + static final Pattern TAG = Pattern.compile("tag_(.*)"); + static final Pattern SNAPSHOT_REF = Pattern.compile("(?:branch_|tag_)(.*)"); public static final Logger LOG = LoggerFactory.getLogger(HiveUtils.class); @@ -443,11 +444,16 @@ public final class HiveUtils { return new Path(root, dbName); } - public static String getTableBranch(String branchName) { - Matcher branch = BRANCH.matcher(branchName); - if (branch.matches()) { - return branch.group(1); + public static String getTableSnapshotRef(String refName) { + Matcher ref = SNAPSHOT_REF.matcher(refName); + if (ref.matches()) { + return ref.group(1); } return null; } + + public static Boolean isTableTag(String refName) { + Matcher ref = TAG.matcher(refName); + return ref.matches(); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java index 5f1cbad9b34..21752177e63 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java @@ -140,7 +140,7 @@ public class Table implements Serializable { */ private String asOfTimestamp = null; - private String branchName; + private String snapshotRef; /** * Used only for serialization. @@ -184,7 +184,7 @@ public class Table implements Serializable { newTab.setVersionIntervalFrom(this.versionIntervalFrom); newTab.setMetaTable(this.getMetaTable()); - newTab.setBranchName(this.getBranchName()); + newTab.setSnapshotRef(this.getSnapshotRef()); return newTab; } @@ -1360,12 +1360,12 @@ public class Table implements Serializable { this.metaTable = metaTable; } - public String getBranchName() { - return branchName; + public String getSnapshotRef() { + return snapshotRef; } - public void setBranchName(String branchName) { - this.branchName = branchName; + public void setSnapshotRef(String snapshotRef) { + this.snapshotRef = snapshotRef; } public SourceTable createSourceTable() { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java index 7e40f0d4e29..f15dbf5694d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/ASTBuilder.java @@ -31,7 +31,6 @@ import org.apache.calcite.util.TimeString; import org.apache.calcite.util.TimestampString; import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; -import org.apache.hadoop.hive.common.type.TimestampTZUtil; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.ql.optimizer.calcite.RelOptHiveTable; import org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan; @@ -83,8 +82,8 @@ public class ASTBuilder { .add(HiveParser.Identifier, hTbl.getHiveTableMD().getTableName()); if (hTbl.getHiveTableMD().getMetaTable() != null) { tableNameBuilder.add(HiveParser.Identifier, hTbl.getHiveTableMD().getMetaTable()); - } else if (hTbl.getHiveTableMD().getBranchName() != null) { - tableNameBuilder.add(HiveParser.Identifier, hTbl.getHiveTableMD().getBranchName()); + } else if (hTbl.getHiveTableMD().getSnapshotRef() != null) { + tableNameBuilder.add(HiveParser.Identifier, hTbl.getHiveTableMD().getSnapshotRef()); } ASTBuilder b = ASTBuilder.construct(HiveParser.TOK_TABREF, "TOK_TABREF").add(tableNameBuilder); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java index 63a58ffc697..95aef18b654 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java @@ -181,8 +181,8 @@ public class PartitionPruner extends Transform { String key = tab.getFullyQualifiedName() + ";"; if (tab.getMetaTable() != null) { key = tab.getFullyQualifiedName() + "." + tab.getMetaTable() + ";"; - } else if (tab.getBranchName() != null) { - key = tab.getFullyQualifiedName() + "." + tab.getBranchName() + ";"; + } else if (tab.getSnapshotRef() != null) { + key = tab.getFullyQualifiedName() + "." + tab.getSnapshotRef() + ";"; } if (!tab.isPartitioned()) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index bbbc467ca76..61821340921 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -2399,12 +2399,18 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } boolean isTableWrittenTo = qb.getParseInfo().isInsertIntoTable(ts.tableHandle.getDbName(), - ts.tableHandle.getTableName(), ts.tableHandle.getBranchName()); + ts.tableHandle.getTableName(), ts.tableHandle.getSnapshotRef()); isTableWrittenTo |= (qb.getParseInfo().getInsertOverwriteTables(). get(getUnescapedName((ASTNode) ast.getChild(0), ts.tableHandle.getDbName()).toLowerCase()) != null); assert isTableWrittenTo : "Inconsistent data structure detected: we are writing to " + ts.tableHandle + " in " + name + " but it's not in isInsertIntoTable() or getInsertOverwriteTables()"; + Boolean isTableTag = Optional.ofNullable(ts.tableHandle.getSnapshotRef()).map(HiveUtils::isTableTag) + .orElse(false); + if (isTableTag) { + throw new UnsupportedOperationException("Don't support write (insert/delete/update/merge) to iceberg tag " + + HiveUtils.getTableSnapshotRef(ts.tableHandle.getSnapshotRef())); + } // Disallow update and delete on non-acid tables final boolean isWriteOperation = updating(name) || deleting(name); boolean isFullAcid = AcidUtils.isFullAcidTable(ts.tableHandle) || @@ -7387,7 +7393,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { throw new SemanticException("Failed to allocate write Id", ex); } boolean isReplace = !qb.getParseInfo().isInsertIntoTable( - destinationTable.getDbName(), destinationTable.getTableName(), destinationTable.getBranchName()); + destinationTable.getDbName(), destinationTable.getTableName(), destinationTable.getSnapshotRef()); ltd = new LoadTableDesc(queryTmpdir, tableDescriptor, dpCtx, acidOp, isReplace, writeId); if (writeId != null) { ltd.setStmtId(txnMgr.getCurrentStmtId()); @@ -7396,7 +7402,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old // deltas and base and leave them up to the cleaner to clean up boolean isInsertInto = qb.getParseInfo().isInsertIntoTable( - destinationTable.getDbName(), destinationTable.getTableName(), destinationTable.getBranchName()); + destinationTable.getDbName(), destinationTable.getTableName(), destinationTable.getSnapshotRef()); LoadFileType loadType; if (isDirectInsert) { loadType = LoadFileType.IGNORE; @@ -7416,7 +7422,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { setStatsForNonNativeTable(destinationTable.getDbName(), destinationTable.getTableName()); // true if it is insert overwrite. boolean overwrite = !qb.getParseInfo().isInsertIntoTable(destinationTable.getDbName(), destinationTable.getTableName(), - destinationTable.getBranchName()); + destinationTable.getSnapshotRef()); createPreInsertDesc(destinationTable, overwrite); ltd = new LoadTableDesc(queryTmpdir, tableDescriptor, partSpec == null ? ImmutableMap.of() : partSpec); @@ -8008,11 +8014,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (destType == QBMetaData.DEST_TABLE) { genAutoColumnStatsGatheringPipeline(destinationTable, partSpec, input, qb.getParseInfo().isInsertIntoTable(destinationTable.getDbName(), destinationTable.getTableName(), - destinationTable.getBranchName()), false); + destinationTable.getSnapshotRef()), false); } else if (destType == QBMetaData.DEST_PARTITION) { genAutoColumnStatsGatheringPipeline(destinationTable, destinationPartition.getSpec(), input, qb.getParseInfo().isInsertIntoTable(destinationTable.getDbName(), destinationTable.getTableName(), - destinationTable.getBranchName()), false); + destinationTable.getSnapshotRef()), false); } else if (destType == QBMetaData.DEST_LOCAL_FILE || destType == QBMetaData.DEST_DFS_FILE) { // CTAS or CMV statement genAutoColumnStatsGatheringPipeline(destinationTable, null, input, @@ -8493,7 +8499,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // If the query here is an INSERT_INTO and the target is an immutable table, // verify that our destination is empty before proceeding if (!dest_tab.isImmutable() || !qb.getParseInfo().isInsertIntoTable( - dest_tab.getDbName(), dest_tab.getTableName(), dest_tab.getBranchName())) { + dest_tab.getDbName(), dest_tab.getTableName(), dest_tab.getSnapshotRef())) { return; } try { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index 4e669171e77..d6e8935a3e9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -115,8 +115,8 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD public static final String FROM_VERSION = "hive.io.version.from"; - public static final String BRANCH_NAME = - "hive.io.branch.name"; + public static final String SNAPSHOT_REF = + "hive.io.snapshot.ref"; // input file name (big) to bucket number private Map<String, Integer> bucketFileNameMapping; @@ -147,7 +147,7 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD private String asOfTimestamp = null; - private String branchName = null; + private String snapshotRef = null; public TableScanDesc() { this(null, null); @@ -179,7 +179,7 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD asOfTimestamp = tblMetadata.getAsOfTimestamp(); asOfVersion = tblMetadata.getAsOfVersion(); versionIntervalFrom = tblMetadata.getVersionIntervalFrom(); - branchName = tblMetadata.getBranchName(); + snapshotRef = tblMetadata.getSnapshotRef(); } isTranscationalTable = AcidUtils.isTransactionalTable(this.tableMetadata); if (isTranscationalTable) { @@ -549,9 +549,9 @@ public class TableScanDesc extends AbstractOperatorDesc implements IStatsGatherD return asOfTimestamp; } - @Explain(displayName = "branch name") - public String getBranchName() { - return branchName; + @Explain(displayName = "Snapshot ref") + public String getSnapshotRef() { + return snapshotRef; } public class TableScanOperatorExplainVectorization extends OperatorExplainVectorization { diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/TableName.java b/storage-api/src/java/org/apache/hadoop/hive/common/TableName.java index 27ad59fdd4e..1c9cee6d160 100644 --- a/storage-api/src/java/org/apache/hadoop/hive/common/TableName.java +++ b/storage-api/src/java/org/apache/hadoop/hive/common/TableName.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.common; import java.io.Serializable; import java.util.Objects; +import java.util.regex.Pattern; /** * A container for a fully qualified table name, i.e. catalogname.databasename.tablename. Also @@ -31,7 +32,7 @@ public class TableName implements Serializable { /** Exception message thrown. */ private static final String ILL_ARG_EXCEPTION_MSG = "Table name must be either <tablename>, <dbname>.<tablename> " + "or <catname>.<dbname>.<tablename>"; - public static final String BRANCH_NAME_PREFIX = "branch_"; + public static final Pattern SNAPSHOT_REF = Pattern.compile("(?:branch_|tag_)(.*)"); /** Names of the related DB objects. */ private final String cat; @@ -91,7 +92,7 @@ public class TableName implements Serializable { if (names.length == 2) { return new TableName(defaultCatalog, names[0], names[1], null); } else if (names.length == 3) { - if (names[2].startsWith(BRANCH_NAME_PREFIX)) { + if (SNAPSHOT_REF.matcher(names[2]).matches()) { return new TableName(defaultCatalog, names[0], names[1], names[2]); } else { return new TableName(names[0], names[1], names[2], null);