This is an automated email from the ASF dual-hosted git repository. lpinter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new a7ea25a72e HIVE-26133: Insert overwrite on Iceberg tables can result in duplicate entries after partition evolution (#3202) (Laszlo Pinter, reviewed by Marton Bod and Peter Vary) a7ea25a72e is described below commit a7ea25a72ec5334d3cac15f503b651de8200ff9c Author: László Pintér <47777102+lcspin...@users.noreply.github.com> AuthorDate: Thu Apr 14 15:22:42 2022 +0200 HIVE-26133: Insert overwrite on Iceberg tables can result in duplicate entries after partition evolution (#3202) (Laszlo Pinter, reviewed by Marton Bod and Peter Vary) --- .../iceberg/mr/hive/HiveIcebergStorageHandler.java | 10 ++++++++++ .../apache/iceberg/mr/hive/TestHiveIcebergInserts.java | 18 ++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java index e68458eafe..4c82eb78cd 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java @@ -78,6 +78,7 @@ import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.JobStatus; import org.apache.hadoop.mapred.OutputCommitter; import org.apache.hadoop.mapred.OutputFormat; +import org.apache.iceberg.ManifestFile; import org.apache.iceberg.PartitionSpecParser; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; @@ -460,6 +461,15 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H if (IcebergTableUtil.isBucketed(table)) { throw new SemanticException("Cannot perform insert overwrite query on bucket partitioned Iceberg table."); } + if (table.currentSnapshot() != null) { + if (table.currentSnapshot().allManifests().parallelStream().map(ManifestFile::partitionSpecId) + .anyMatch(id -> id < table.spec().specId())) { + throw new SemanticException( + "Cannot perform insert overwrite query on Iceberg table where partition evolution happened. In order " + + "to succesfully carry out any insert overwrite operation on this table, the data has to be rewritten " + + "conforming to the latest spec. "); + } + } } } diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java index 8545447cd2..f38eea1969 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java @@ -183,6 +183,24 @@ public class TestHiveIcebergInserts extends HiveIcebergStorageHandlerWithEngineB testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, target, true))); } + @Test + public void testInsertOverwriteWithPartitionEvolutionThrowsError() throws IOException { + TableIdentifier target = TableIdentifier.of("default", "target"); + Table table = testTables.createTable(shell, target.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, + fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS); + shell.executeStatement("ALTER TABLE target SET PARTITION SPEC(TRUNCATE(2, last_name))"); + List<Record> newRecords = TestHelper.RecordsBuilder.newInstance(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + .add(0L, "Mike", "Taylor") + .add(1L, "Christy", "Hubert") + .build(); + AssertHelpers.assertThrows("IOW should not work on tables with partition evolution", + IllegalArgumentException.class, + "Cannot perform insert overwrite query on Iceberg table where partition evolution happened.", + () -> shell.executeStatement(testTables.getInsertQuery(newRecords, target, true))); + // TODO: we should add additional test cases after merge + compaction is supported in hive that allows us to + // rewrite the data + } + /** * Testing map-reduce inserts. * @throws IOException If there is an underlying IOException