[hive] branch master updated: HIVE-26183: Create delete writer for the UPDATE statements (Peter Vary reviewed by Adam Szita and Marton Bod) (#3251)
This is an automated email from the ASF dual-hosted git repository. pvary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git The following commit(s) were added to refs/heads/master by this push: new 9067431bee HIVE-26183: Create delete writer for the UPDATE statements (Peter Vary reviewed by Adam Szita and Marton Bod) (#3251) 9067431bee is described below commit 9067431bee431c3f51124e30c6551ed04b2e3d22 Author: pvary AuthorDate: Wed May 4 10:50:04 2022 +0200 HIVE-26183: Create delete writer for the UPDATE statements (Peter Vary reviewed by Adam Szita and Marton Bod) (#3251) --- iceberg/iceberg-handler/pom.xml| 7 +- .../org/apache/iceberg/mr/hive/FilesForCommit.java | 18 +- .../mr/hive/HiveIcebergBufferedDeleteWriter.java | 181 + .../iceberg/mr/hive/HiveIcebergDeleteWriter.java | 4 +- .../mr/hive/HiveIcebergOutputCommitter.java| 6 +- .../iceberg/mr/hive/HiveIcebergOutputFormat.java | 4 +- .../iceberg/mr/hive/HiveIcebergRecordWriter.java | 6 +- .../iceberg/mr/hive/HiveIcebergUpdateWriter.java | 89 ++ .../apache/iceberg/mr/hive/HiveIcebergWriter.java | 84 +- ...ebergWriter.java => HiveIcebergWriterBase.java} | 18 +- .../apache/iceberg/mr/hive/IcebergAcidUtil.java| 108 +--- .../org/apache/iceberg/data/IcebergGenerics2.java | 103 .../java/org/apache/iceberg/mr/TestHelper.java | 27 +++ .../iceberg/mr/hive/HiveIcebergWriterTestBase.java | 151 + .../mr/hive/TestHiveIcebergDeleteWriter.java | 116 + .../mr/hive/TestHiveIcebergOutputCommitter.java| 4 +- .../mr/hive/TestHiveIcebergUpdateWriter.java | 159 ++ iceberg/pom.xml| 6 + 18 files changed, 963 insertions(+), 128 deletions(-) diff --git a/iceberg/iceberg-handler/pom.xml b/iceberg/iceberg-handler/pom.xml index 6a37fdbf16..20ae1e6ec9 100644 --- a/iceberg/iceberg-handler/pom.xml +++ b/iceberg/iceberg-handler/pom.xml @@ -100,11 +100,16 @@ assertj-core test + + org.apache.iceberg + iceberg-core + tests + test + org.roaringbitmap RoaringBitmap 0.9.22 - test diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java index 0dd490628c..237ef55369 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java @@ -20,8 +20,8 @@ package org.apache.iceberg.mr.hive; import java.io.Serializable; +import java.util.Collection; import java.util.Collections; -import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.iceberg.ContentFile; @@ -30,19 +30,19 @@ import org.apache.iceberg.DeleteFile; public class FilesForCommit implements Serializable { - private final List dataFiles; - private final List deleteFiles; + private final Collection dataFiles; + private final Collection deleteFiles; - public FilesForCommit(List dataFiles, List deleteFiles) { + public FilesForCommit(Collection dataFiles, Collection deleteFiles) { this.dataFiles = dataFiles; this.deleteFiles = deleteFiles; } - public static FilesForCommit onlyDelete(List deleteFiles) { + public static FilesForCommit onlyDelete(Collection deleteFiles) { return new FilesForCommit(Collections.emptyList(), deleteFiles); } - public static FilesForCommit onlyData(List dataFiles) { + public static FilesForCommit onlyData(Collection dataFiles) { return new FilesForCommit(dataFiles, Collections.emptyList()); } @@ -50,15 +50,15 @@ public class FilesForCommit implements Serializable { return new FilesForCommit(Collections.emptyList(), Collections.emptyList()); } - public List dataFiles() { + public Collection dataFiles() { return dataFiles; } - public List deleteFiles() { + public Collection deleteFiles() { return deleteFiles; } - public List allFiles() { + public Collection allFiles() { return Stream.concat(dataFiles.stream(), deleteFiles.stream()).collect(Collectors.toList()); } } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergBufferedDeleteWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergBufferedDeleteWriter.java new file mode 100644 index 00..99d59341ed --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergBufferedDeleteWriter.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work
[hive] branch master updated: HIVE-26076: Non blocking ADD PARTITION if not exists (Denys Kuzmenko, reviewed by Antal Sinkovits, Laszlo Vegh)
This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git The following commit(s) were added to refs/heads/master by this push: new 71c04dd5bb HIVE-26076: Non blocking ADD PARTITION if not exists (Denys Kuzmenko, reviewed by Antal Sinkovits, Laszlo Vegh) 71c04dd5bb is described below commit 71c04dd5bb51176eb79052818aa179b0b535c6d8 Author: Denys Kuzmenko AuthorDate: Wed May 4 09:51:33 2022 +0200 HIVE-26076: Non blocking ADD PARTITION if not exists (Denys Kuzmenko, reviewed by Antal Sinkovits, Laszlo Vegh) Closes #3122 --- .../add/AbstractAddPartitionAnalyzer.java | 6 +- .../apache/hadoop/hive/ql/TestTxnAddPartition.java | 13 +--- .../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java | 78 +++--- 3 files changed, 59 insertions(+), 38 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AbstractAddPartitionAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AbstractAddPartitionAnalyzer.java index 9da2daa8ec..b8433d2617 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AbstractAddPartitionAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AbstractAddPartitionAnalyzer.java @@ -59,11 +59,11 @@ abstract class AbstractAddPartitionAnalyzer extends AbstractAlterTableAnalyzer { boolean ifNotExists = command.getChild(0).getType() == HiveParser.TOK_IFNOTEXISTS; outputs.add(new WriteEntity(table, -/* use DDL_EXCLUSIVE to cause X lock to prevent races between concurrent add partition calls with IF NOT EXISTS. +/* use DDL_EXCL_WRITE to cause X_WRITE lock to prevent races between concurrent add partition calls with IF NOT EXISTS. * w/o this 2 concurrent calls to add the same partition may both add data since for transactional tables * creating partition metadata and moving data there are 2 separate actions. */ -ifNotExists && AcidUtils.isTransactionalTable(table) ? -WriteType.DDL_EXCLUSIVE : WriteEntity.WriteType.DDL_SHARED)); + ifNotExists && AcidUtils.isTransactionalTable(table) ? + WriteType.DDL_EXCL_WRITE : WriteType.DDL_SHARED)); List partitions = createPartitions(command, table, ifNotExists); if (partitions.isEmpty()) { // nothing to do diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java index fa15e2876a..d07a2281a1 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnAddPartition.java @@ -21,10 +21,8 @@ package org.apache.hadoop.hive.ql; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2; import org.apache.hadoop.hive.ql.processors.CommandProcessorException; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -269,16 +267,7 @@ public class TestTxnAddPartition extends TxnCommandsBaseForTests { "warehouse/t/p=0/delta_001_001_/01_0"}}; checkExpected(rs, expected, "add partition (p=0)"); } - - /** - * {@link TestDbTxnManager2#testAddPartitionLocks} - */ - @Ignore - @Test - public void testLocks() throws Exception { - } - - + @Test public void addPartitionTransactional() throws Exception { exception.expect(RuntimeException.class); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java index 613e3ab858..33c303b2e4 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java @@ -3116,29 +3116,6 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{ @Rule public TemporaryFolder exportFolder = new TemporaryFolder(); - /** - * see also {@link org.apache.hadoop.hive.ql.TestTxnAddPartition} - */ - @Test - public void testAddPartitionLocks() throws Exception { -dropTable(new String[] {"T", "Tstage"}); -driver.run("create table T (a int, b int) partitioned by (p int) " + -"stored as orc tblproperties('transactional'='true')"); -//bucketed just so that we get 2 files -driver.run("create table Tstage (a int, b int) clustered by (a) into 2 " + -"buckets stored as orc tblproperties('transactional'='false')"); -driver.run("insert into Tstage values(0,2),(1,4)"); -String exportLoc = exportFolder.newFolder("1").toString(); -driver.run("export table Tstage to '" + exportLoc + "'"); - -driver.compileAndRespond("ALTER TABLE T ADD if not exists P