cherry picked HIVE-15899 from master
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5b3c7979 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5b3c7979 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5b3c7979 Branch: refs/heads/hive-14535 Commit: 5b3c79798e28fd3c7f5718e361efee0f77eec047 Parents: 6f0be79 Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Tue Sep 26 19:42:06 2017 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Tue Sep 26 19:48:41 2017 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hive/ql/TestAcidOnTez.java | 329 +++++++++++++++---- .../java/org/apache/hadoop/hive/ql/Context.java | 3 + .../java/org/apache/hadoop/hive/ql/Driver.java | 6 +- .../apache/hadoop/hive/ql/exec/MoveTask.java | 13 +- .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 12 + .../apache/hadoop/hive/ql/metadata/Hive.java | 43 ++- .../ql/optimizer/QueryPlanPostProcessor.java | 166 ++++++++++ .../optimizer/unionproc/UnionProcFactory.java | 3 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 35 +- .../apache/hadoop/hive/ql/plan/LoadDesc.java | 22 +- .../hadoop/hive/ql/plan/LoadFileDesc.java | 22 +- .../hadoop/hive/ql/plan/LoadTableDesc.java | 29 +- .../hadoop/hive/ql/plan/TezEdgeProperty.java | 10 +- .../apache/hadoop/hive/ql/TestTxnNoBuckets.java | 189 +++++++++-- .../clientpositive/autoColumnStats_4.q.out | 1 + .../clientpositive/llap/acid_no_buckets.q.out | 8 + .../llap/dynamic_semijoin_reduction_3.q.out | 7 + .../llap/dynpart_sort_optimization_acid.q.out | 12 + .../results/clientpositive/llap/mm_all.q.out | 1 + .../results/clientpositive/llap/sqlmerge.q.out | 4 + ql/src/test/results/clientpositive/mm_all.q.out | 1 + 21 files changed, 760 insertions(+), 156 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java index d0b5cf6..8b4b21f 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; import org.apache.hadoop.hive.metastore.txn.TxnStore; import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; @@ -88,7 +89,6 @@ public class TestAcidOnTez { hiveConf = new HiveConf(this.getClass()); hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, TEST_WAREHOUSE_DIR); hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict"); hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName()); @@ -179,14 +179,14 @@ public class TestAcidOnTez { runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + " stored as ORC TBLPROPERTIES('transactional'='false') as " + "select a, b from " + Table.ACIDTBL + " where a <= 5 union all select a, b from " + Table.NONACIDORCTBL + " where a >= 5", confForTez); - List<String> rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by a, b, INPUT__FILE__NAME"); + List<String> rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by a, b, INPUT__FILE__NAME", confForTez); String expected0[][] = { - {"1\t2", "/1/000000_0"}, - {"3\t4", "/1/000000_0"}, - {"5\t6", "/1/000000_0"}, - {"5\t6", "/2/000000_0"}, - {"7\t8", "/2/000000_0"}, - {"9\t10", "/2/000000_0"}, + {"1\t2", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"3\t4", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"5\t6", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"5\t6", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"7\t8", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"9\t10", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, }; Assert.assertEquals("Unexpected row count after ctas", expected0.length, rs.size()); //verify data and layout @@ -195,9 +195,9 @@ public class TestAcidOnTez { Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected0[i][1])); } //make the table ACID - runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " SET TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " SET TBLPROPERTIES ('transactional'='true')", confForTez); - rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID"); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID", confForTez); LOG.warn("after ctas:"); for (String s : rs) { LOG.warn(s); @@ -206,12 +206,12 @@ public class TestAcidOnTez { /* * Expected result 0th entry i the RecordIdentifier + data. 1st entry file before compact*/ String expected[][] = { - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", "/2/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", "/2/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, }; Assert.assertEquals("Unexpected row count after ctas", expected.length, rs.size()); //verify data and layout @@ -220,17 +220,17 @@ public class TestAcidOnTez { Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1])); } //perform some Update/Delete - runStatementOnDriver("update " + Table.NONACIDNONBUCKET + " set a = 70, b = 80 where a = 7"); - runStatementOnDriver("delete from " + Table.NONACIDNONBUCKET + " where a = 5"); - rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID"); + runStatementOnDriver("update " + Table.NONACIDNONBUCKET + " set a = 70, b = 80 where a = 7", confForTez); + runStatementOnDriver("delete from " + Table.NONACIDNONBUCKET + " where a = 5", confForTez); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID", confForTez); LOG.warn("after update/delete:"); for (String s : rs) { LOG.warn(s); } String[][] expected2 = { - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000021_0000021_0000/bucket_00000"} }; Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size()); @@ -255,9 +255,9 @@ public class TestAcidOnTez { Assert.assertNull("at " + i + " " + expectedDelDelta[i] + " not found on disk", expectedDelDelta[i]); } //run Minor compaction - runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " compact 'minor'"); + runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " compact 'minor'", confForTez); TestTxnCommands2.runWorker(hiveConf); - rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID"); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID", confForTez); LOG.warn("after compact minor:"); for (String s : rs) { LOG.warn(s); @@ -285,9 +285,9 @@ public class TestAcidOnTez { Assert.assertNull("at " + i + " " + expectedDelDelta2[i] + " not found on disk", expectedDelDelta2[i]); } //run Major compaction - runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " compact 'major'"); + runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " compact 'major'", confForTez); TestTxnCommands2.runWorker(hiveConf); - rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID"); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDNONBUCKET + " order by ROW__ID", confForTez); LOG.warn("after compact major:"); for (String s : rs) { LOG.warn(s); @@ -306,8 +306,12 @@ public class TestAcidOnTez { * How to do this? CTAS is the only way to create data files which are not immediate children * of the partition dir. CTAS/Union/Tez doesn't support partition tables. The only way is to copy * data files in directly. + * + * Actually Insert Into ... select ... union all ... with + * HIVE_OPTIMIZE_UNION_REMOVE (and HIVEFETCHTASKCONVERSION="none"?) will create subdirs + * but if writing to non acid table there is a merge task on MR (but not on Tez) */ - @Ignore("HIVE-17214") + @Ignore("HIVE-17214")//this consistently works locally but never in ptest.... @Test public void testNonStandardConversion02() throws Exception { HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf @@ -320,13 +324,13 @@ public class TestAcidOnTez { "union all select a, b from " + Table.ACIDTBL + " where a = 5", confForTez); List<String> rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from " + - Table.NONACIDNONBUCKET + " order by a, b"); + Table.NONACIDNONBUCKET + " order by a, b", confForTez); String expected0[][] = { - {"1\t2", "/1/000000_0"}, - {"3\t4", "/1/000000_0"}, - {"5\t6", "/3/000000_0"}, - {"7\t8", "/2/000000_0"}, - {"9\t10", "/2/000000_0"}, + {"1\t2", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"3\t4", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"5\t6", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "3/000000_0"}, + {"7\t8", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"9\t10", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, }; Assert.assertEquals("Unexpected row count after ctas", expected0.length, rs.size()); //verify data and layout @@ -338,7 +342,7 @@ public class TestAcidOnTez { FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDNONBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); //ensure there is partition dir - runStatementOnDriver("insert into " + Table.NONACIDPART + " partition (p=1) values (100,110)"); + runStatementOnDriver("insert into " + Table.NONACIDPART + " partition (p=1) values (100,110)", confForTez); //creates more files in that partition for(FileStatus stat : status) { int limit = 5; @@ -357,29 +361,29 @@ public class TestAcidOnTez { nonacidpart/ âââ p=1 âââ 000000_0 - âââ 1 + âââ HIVE_UNION_SUBDIR__1 â  âââ 000000_0 - âââ 2 + âââ HIVE_UNION_SUBDIR_2 â  âââ 000000_0 - âââ 3 + âââ HIVE_UNION_SUBDIR_3 âââ 000000_0 4 directories, 4 files **/ //make the table ACID - runStatementOnDriver("alter table " + Table.NONACIDPART + " SET TBLPROPERTIES ('transactional'='true')"); - rs = runStatementOnDriver("select ROW__ID, a, b, p, INPUT__FILE__NAME from " + Table.NONACIDPART + " order by ROW__ID"); + runStatementOnDriver("alter table " + Table.NONACIDPART + " SET TBLPROPERTIES ('transactional'='true')", confForTez); + rs = runStatementOnDriver("select ROW__ID, a, b, p, INPUT__FILE__NAME from " + Table.NONACIDPART + " order by ROW__ID", confForTez); LOG.warn("after acid conversion:"); for (String s : rs) { LOG.warn(s); } String[][] expected = { {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t100\t110\t1", "nonacidpart/p=1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t1", "nonacidpart/p=1/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t3\t4\t1", "nonacidpart/p=1/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10\t1", "nonacidpart/p=1/2/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8\t1", "nonacidpart/p=1/2/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6\t1", "nonacidpart/p=1/3/000000_0"} + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t1", "nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t3\t4\t1", "nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10\t1", "nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8\t1", "nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6\t1", "nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "3/000000_0"} }; Assert.assertEquals("Wrong row count", expected.length, rs.size()); //verify data and layout @@ -389,9 +393,9 @@ public class TestAcidOnTez { } //run Major compaction - runStatementOnDriver("alter table " + Table.NONACIDPART + " partition (p=1) compact 'major'"); + runStatementOnDriver("alter table " + Table.NONACIDPART + " partition (p=1) compact 'major'", confForTez); TestTxnCommands2.runWorker(hiveConf); - rs = runStatementOnDriver("select ROW__ID, a, b, p, INPUT__FILE__NAME from " + Table.NONACIDPART + " order by ROW__ID"); + rs = runStatementOnDriver("select ROW__ID, a, b, p, INPUT__FILE__NAME from " + Table.NONACIDPART + " order by ROW__ID", confForTez); LOG.warn("after major compaction:"); for (String s : rs) { LOG.warn(s); @@ -406,33 +410,42 @@ public class TestAcidOnTez { } /** * CTAS + Tez + Union creates a non-standard layout in table dir - * Each leg of the union places data into a subdir of the table/partition. Subdirs are named 1/, 2/, etc - * The way this currently works is that CTAS creates an Acid table but the insert statement writes - * the data in non-acid layout. Then on read, it's treated like an non-acid to acid conversion. - * Longer term CTAS should create acid layout from the get-go. + * Each leg of the union places data into a subdir of the table/partition. + * Subdirs are named HIVE_UNION_SUBDIR_1/, HIVE_UNION_SUBDIR_2/, etc + * For Acid tables the writer for each dir must have a different statementId ensured by + * {@link org.apache.hadoop.hive.ql.optimizer.QueryPlanPostProcessor}. + * {@link org.apache.hadoop.hive.ql.metadata.Hive#moveAcidFiles(FileSystem, FileStatus[], Path, List)} drops the union subdirs + * since each delta file has a unique name. */ @Test public void testCtasTezUnion() throws Exception { HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf + confForTez.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false); setupTez(confForTez); //CTAS with ACID target table + List<String> rs0 = runStatementOnDriver("explain create table " + Table.ACIDNOBUCKET + " stored as ORC TBLPROPERTIES('transactional'='true') as " + + "select a, b from " + Table.ACIDTBL + " where a <= 5 union all select a, b from " + Table.NONACIDORCTBL + " where a >= 5", confForTez); + LOG.warn("explain ctas:");//TezEdgeProperty.EdgeType + for (String s : rs0) { + LOG.warn(s); + } runStatementOnDriver("create table " + Table.ACIDNOBUCKET + " stored as ORC TBLPROPERTIES('transactional'='true') as " + "select a, b from " + Table.ACIDTBL + " where a <= 5 union all select a, b from " + Table.NONACIDORCTBL + " where a >= 5", confForTez); - List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID"); + List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID", confForTez); LOG.warn("after ctas:"); for (String s : rs) { LOG.warn(s); } Assert.assertEquals(0, BucketCodec.determineVersion(536870912).decodeWriterId(536870912)); /* - * Expected result 0th entry i the RecordIdentifier + data. 1st entry file before compact*/ + * Expected result 0th entry is the RecordIdentifier + data. 1st entry file before compact*/ String expected[][] = { - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", "/2/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", "/2/000000_0"}, + {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000018_0000018_0001/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000018_0000018_0001/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":2}\t5\t6", "/delta_0000018_0000018_0001/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":0}\t9\t10", "/delta_0000018_0000018_0002/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "/delta_0000018_0000018_0002/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":2}\t5\t6", "/delta_0000018_0000018_0002/bucket_00000"}, }; Assert.assertEquals("Unexpected row count after ctas", expected.length, rs.size()); //verify data and layout @@ -441,18 +454,18 @@ public class TestAcidOnTez { Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1])); } //perform some Update/Delete - runStatementOnDriver("update " + Table.ACIDNOBUCKET + " set a = 70, b = 80 where a = 7"); - runStatementOnDriver("delete from " + Table.ACIDNOBUCKET + " where a = 5"); - rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID"); + runStatementOnDriver("update " + Table.ACIDNOBUCKET + " set a = 70, b = 80 where a = 7", confForTez); + runStatementOnDriver("delete from " + Table.ACIDNOBUCKET + " where a = 5", confForTez); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID", confForTez); LOG.warn("after update/delete:"); for (String s : rs) { LOG.warn(s); } String[][] expected2 = { - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/1/000000_0"}, - {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", "/2/000000_0"}, - {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000019_0000019_0000/bucket_00000"} + {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000018_0000018_0001/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000018_0000018_0001/bucket_00000"}, + {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":0}\t9\t10", "/delta_0000018_0000018_0002/bucket_00000"}, + {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000020_0000020_0000/bucket_00000"} }; Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size()); //verify data and layout @@ -464,7 +477,7 @@ public class TestAcidOnTez { FileSystem fs = FileSystem.get(hiveConf); FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (Table.ACIDNOBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - String[] expectedDelDelta = {"delete_delta_0000019_0000019_0000", "delete_delta_0000020_0000020_0000"}; + String[] expectedDelDelta = {"delete_delta_0000020_0000020_0000", "delete_delta_0000021_0000021_0000"}; for(FileStatus stat : status) { for(int i = 0; i < expectedDelDelta.length; i++) { if(expectedDelDelta[i] != null && stat.getPath().toString().endsWith(expectedDelDelta[i])) { @@ -476,9 +489,9 @@ public class TestAcidOnTez { Assert.assertNull("at " + i + " " + expectedDelDelta[i] + " not found on disk", expectedDelDelta[i]); } //run Minor compaction - runStatementOnDriver("alter table " + Table.ACIDNOBUCKET + " compact 'minor'"); + runStatementOnDriver("alter table " + Table.ACIDNOBUCKET + " compact 'minor'", confForTez); TestTxnCommands2.runWorker(hiveConf); - rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID"); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID", confForTez); LOG.warn("after compact minor:"); for (String s : rs) { LOG.warn(s); @@ -493,7 +506,7 @@ public class TestAcidOnTez { //check we have right delete delta files after minor compaction status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (Table.ACIDNOBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - String[] expectedDelDelta2 = { "delete_delta_0000019_0000019_0000", "delete_delta_0000020_0000020_0000", "delete_delta_0000019_0000020"}; + String[] expectedDelDelta2 = { "delete_delta_0000020_0000020_0000", "delete_delta_0000021_0000021_0000", "delete_delta_0000018_0000021"}; for(FileStatus stat : status) { for(int i = 0; i < expectedDelDelta2.length; i++) { if(expectedDelDelta2[i] != null && stat.getPath().toString().endsWith(expectedDelDelta2[i])) { @@ -506,9 +519,9 @@ public class TestAcidOnTez { Assert.assertNull("at " + i + " " + expectedDelDelta2[i] + " not found on disk", expectedDelDelta2[i]); } //run Major compaction - runStatementOnDriver("alter table " + Table.ACIDNOBUCKET + " compact 'major'"); + runStatementOnDriver("alter table " + Table.ACIDNOBUCKET + " compact 'major'", confForTez); TestTxnCommands2.runWorker(hiveConf); - rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID"); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID", confForTez); LOG.warn("after compact major:"); for (String s : rs) { LOG.warn(s); @@ -517,7 +530,178 @@ public class TestAcidOnTez { for(int i = 0; i < expected2.length; i++) { Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0])); //everything is now in base/ - Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000020/bucket_00000")); + Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000021/bucket_00000")); + } + } + /** + * 1. Insert into regular unbucketed table from Union all - union is removed and data is placed in + * subdirs of target table. + * 2. convert to acid table and check data + * 3. compact and check data + * Compare with {@link #testAcidInsertWithRemoveUnion()} where T is transactional=true + */ + @Test + public void testInsertWithRemoveUnion() throws Exception { + int[][] values = {{1,2},{3,4},{5,6},{7,8},{9,10}}; + HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf + setupTez(confForTez); + runStatementOnDriver("drop table if exists T", confForTez); + runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='false')", confForTez); + /* +ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree ~/dev/hiverwgit/itests/hive-unit/target/tmp/org.apache.hadoop.hive.ql.TestAcidOnTez-1505502329802/warehouse/t/.hive-staging_hive_2017-09-15_12-07-33_224_7717909516029836949-1/ +/Users/ekoifman/dev/hiverwgit/itests/hive-unit/target/tmp/org.apache.hadoop.hive.ql.TestAcidOnTez-1505502329802/warehouse/t/.hive-staging_hive_2017-09-15_12-07-33_224_7717909516029836949-1/ +âââ -ext-10000 + âââ HIVE_UNION_SUBDIR_1 + â  âââ 000000_0 + âââ HIVE_UNION_SUBDIR_2 + â  âââ 000000_0 + âââ HIVE_UNION_SUBDIR_3 + âââ 000000_0 + +4 directories, 3 files + */ + runStatementOnDriver("insert into T(a,b) select a, b from " + Table.ACIDTBL + " where a between 1 and 3 group by a, b union all select a, b from " + Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " + Table.ACIDTBL + " where a >= 9", confForTez); + List<String> rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME", confForTez); + LOG.warn(testName.getMethodName() + ": before converting to acid"); + for(String s : rs) { + LOG.warn(s); + } + String[][] expected = { + {"1\t2","warehouse/t/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"3\t4","warehouse/t/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, + {"5\t6","warehouse/t/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"7\t8","warehouse/t/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, + {"9\t10","warehouse/t/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "3/000000_0"} + }; + Assert.assertEquals("Unexpected row count after conversion", expected.length, rs.size()); + for(int i = 0; i < expected.length; i++) { + Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); + Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1])); + } + //make the table ACID + runStatementOnDriver("alter table T SET TBLPROPERTIES ('transactional'='true')", confForTez); + rs = runStatementOnDriver("select a,b from T order by a, b", confForTez); + Assert.assertEquals("After to Acid conversion", TestTxnCommands2.stringifyValues(values), rs); + + //run Major compaction + runStatementOnDriver("alter table T compact 'major'", confForTez); + TestTxnCommands2.runWorker(hiveConf); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID", confForTez); + LOG.warn(testName.getMethodName() + ": after compact major of T:"); + for (String s : rs) { + LOG.warn(s); + } + String[][] expected2 = { + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "warehouse/t/base_-9223372036854775808/bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "warehouse/t/base_-9223372036854775808/bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", "warehouse/t/base_-9223372036854775808/bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t7\t8", "warehouse/t/base_-9223372036854775808/bucket_00000"}, + {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t9\t10", "warehouse/t/base_-9223372036854775808/bucket_00000"} + }; + Assert.assertEquals("Unexpected row count after major compact", expected2.length, rs.size()); + for(int i = 0; i < expected2.length; i++) { + Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), rs.get(i).startsWith(expected2[i][0])); + Assert.assertTrue("Actual line(file) " + i + " ac: " + rs.get(i), rs.get(i).endsWith(expected2[i][1])); + } + } + /** + * 1. Insert into unbucketed acid table from Union all - union is removed and data is placed in + * subdirs of target table. + * 2. convert to acid table and check data + * 3. compact and check data + * Compare with {@link #testInsertWithRemoveUnion()} where T is transactional=false + */ + @Test + public void testAcidInsertWithRemoveUnion() throws Exception { + HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf + setupTez(confForTez); + int[][] values = {{1,2},{3,4},{5,6},{7,8},{9,10}}; + runStatementOnDriver("drop table if exists T", confForTez); + runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='true')", confForTez); + /*On Tez, below (T is transactional), we get the following layout +ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree ~/dev/hiverwgit/itests/hive-unit/target/tmp/org.apache.hadoop.hive.ql.TestAcidOnTez-1505500035574/warehouse/t/.hive-staging_hive_2017-09-15_11-28-33_960_9111484239090506828-1/ +/Users/ekoifman/dev/hiverwgit/itests/hive-unit/target/tmp/org.apache.hadoop.hive.ql.TestAcidOnTez-1505500035574/warehouse/t/.hive-staging_hive_2017-09-15_11-28-33_960_9111484239090506828-1/ +âââ -ext-10000 + âââ HIVE_UNION_SUBDIR_1 + â  âââ 000000_0 + â  âââ _orc_acid_version + â  âââ delta_0000019_0000019_0001 + â  âââ bucket_00000 + âââ HIVE_UNION_SUBDIR_2 + â  âââ 000000_0 + â  âââ _orc_acid_version + â  âââ delta_0000019_0000019_0002 + â  âââ bucket_00000 + âââ HIVE_UNION_SUBDIR_3 + âââ 000000_0 + âââ _orc_acid_version + âââ delta_0000019_0000019_0003 + âââ bucket_00000 + +10 directories, 6 files */ + runStatementOnDriver("insert into T(a,b) select a, b from " + Table.ACIDTBL + " where a between 1 and 3 union all select a, b from " + Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " + Table.ACIDTBL + " where a >= 9", confForTez); + List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b", confForTez); + LOG.warn(testName.getMethodName() + ": reading acid table T"); + for(String s : rs) { + LOG.warn(s); + } + + String[][] expected2 = { + {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000019_0000019_0001/bucket_00000"}, + {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "warehouse/t/delta_0000019_0000019_0001/bucket_00000"}, + {"{\"transactionid\":19,\"bucketid\":536870914,\"rowid\":0}\t5\t6", "warehouse/t/delta_0000019_0000019_0002/bucket_00000"}, + {"{\"transactionid\":19,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "warehouse/t/delta_0000019_0000019_0002/bucket_00000"}, + {"{\"transactionid\":19,\"bucketid\":536870915,\"rowid\":0}\t9\t10", "warehouse/t/delta_0000019_0000019_0003/bucket_00000"} + }; + Assert.assertEquals("Unexpected row count", expected2.length, rs.size()); + for(int i = 0; i < expected2.length; i++) { + Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), rs.get(i).startsWith(expected2[i][0])); + Assert.assertTrue("Actual line(file) " + i + " ac: " + rs.get(i), rs.get(i).endsWith(expected2[i][1])); + } + } + @Test + public void testBucketedAcidInsertWithRemoveUnion() throws Exception { + HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing hive conf + setupTez(confForTez); + int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}}; + runStatementOnDriver("delete from " + Table.ACIDTBL, confForTez); + runStatementOnDriver("insert into " + Table.ACIDTBL + TestTxnCommands2.makeValuesClause(values));//make sure both buckets are not empty + runStatementOnDriver("drop table if exists T", confForTez); + /* + With bucketed target table Union All is not removed + + ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree ~/dev/hiverwgit/itests/hive-unit/target/tmp/org.apache.hadoop.hive.ql.TestAcidOnTez-1505510130462/warehouse/t/.hive-staging_hive_2017-09-15_14-16-32_422_4626314315862498838-1/ +/Users/ekoifman/dev/hiverwgit/itests/hive-unit/target/tmp/org.apache.hadoop.hive.ql.TestAcidOnTez-1505510130462/warehouse/t/.hive-staging_hive_2017-09-15_14-16-32_422_4626314315862498838-1/ +âââ -ext-10000 + âââ 000000_0 + â  âââ _orc_acid_version + â  âââ delta_0000021_0000021_0000 + â  âââ bucket_00000 + âââ 000001_0 + âââ _orc_acid_version + âââ delta_0000021_0000021_0000 + âââ bucket_00001 + +5 directories, 4 files +*/ + runStatementOnDriver("create table T (a int, b int) clustered by (a) into 2 buckets stored as ORC TBLPROPERTIES ('transactional'='true')", confForTez); + runStatementOnDriver("insert into T(a,b) select a, b from " + Table.ACIDTBL + " where a between 1 and 3 union all select a, b from " + Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " + Table.ACIDTBL + " where a >= 9", confForTez); + List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b", confForTez); + LOG.warn(testName.getMethodName() + ": reading bucketed acid table T"); + for(String s : rs) { + LOG.warn(s); + } + String[][] expected2 = { + {"{\"transactionid\":21,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000021_0000021_0000/bucket_00001"}, + {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t2\t4", "warehouse/t/delta_0000021_0000021_0000/bucket_00000"}, + {"{\"transactionid\":21,\"bucketid\":536936448,\"rowid\":1}\t5\t6", "warehouse/t/delta_0000021_0000021_0000/bucket_00001"}, + {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t6\t8", "warehouse/t/delta_0000021_0000021_0000/bucket_00000"}, + {"{\"transactionid\":21,\"bucketid\":536936448,\"rowid\":2}\t9\t10", "warehouse/t/delta_0000021_0000021_0000/bucket_00001"} + }; + Assert.assertEquals("Unexpected row count", expected2.length, rs.size()); + for(int i = 0; i < expected2.length; i++) { + Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), rs.get(i).startsWith(expected2[i][0])); + Assert.assertTrue("Actual line(file) " + i + " ac: " + rs.get(i), rs.get(i).endsWith(expected2[i][1])); } } // Ideally test like this should be a qfile test. However, the explain output from qfile is always @@ -613,6 +797,7 @@ public class TestAcidOnTez { private List<String> runStatementOnDriver(String stmt, HiveConf conf) throws Exception { Driver driver = new Driver(conf); + driver.setMaxRows(10000); CommandProcessorResponse cpr = driver.run(stmt); if(cpr.getResponseCode() != 0) { throw new RuntimeException(stmt + " failed: " + cpr); http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/java/org/apache/hadoop/hive/ql/Context.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java index c66934a..d074701 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java @@ -1002,4 +1002,7 @@ public class Context { public void setIsUpdateDeleteMerge(boolean isUpdate) { this.isUpdateDeleteMerge = isUpdate; } + public String getExecutionId() { + return executionId; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 1157e00..b35edfb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1198,7 +1198,11 @@ public class Driver implements CommandProcessor { } // Set the transaction id in all of the acid file sinks if (haveAcidWrite()) { - for (FileSinkDesc desc : plan.getAcidSinks()) { + List<FileSinkDesc> acidSinks = new ArrayList<>(plan.getAcidSinks()); + //sorting makes tests easier to write since file names and ROW__IDs depend on statementId + //so this makes (file name -> data) mapping stable + acidSinks.sort((FileSinkDesc fsd1, FileSinkDesc fsd2) -> fsd1.getDirName().compareTo(fsd2.getDirName())); + for (FileSinkDesc desc : acidSinks) { desc.setTransactionId(txnMgr.getCurrentTxnId()); //it's possible to have > 1 FileSink writing to the same table/partition //e.g. Merge stmt, multi-insert stmt when mixing DP and SP writes http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 739ce18..b000745 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -289,10 +289,19 @@ public class MoveTask extends Task<MoveWork> implements Serializable { Utilities.LOG14535.info("MoveTask not moving LFD " + sourcePath); } else { Utilities.LOG14535.info("MoveTask moving LFD " + sourcePath + " to " + targetPath); - moveFile(sourcePath, targetPath, lfd.getIsDfsDir()); + if(lfd.getWriteType() == AcidUtils.Operation.INSERT) { + //'targetPath' is table root of un-partitioned table/partition + //'sourcePath' result of 'select ...' part of CTAS statement + assert lfd.getIsDfsDir(); + FileSystem srcFs = sourcePath.getFileSystem(conf); + List<Path> newFiles = new ArrayList<>(); + Hive.moveAcidFiles(srcFs, srcFs.globStatus(sourcePath), targetPath, newFiles); + } + else { + moveFile(sourcePath, targetPath, lfd.getIsDfsDir()); + } } } - // Multi-file load is for dynamic partitions when some partitions do not // need to merge and they can simply be moved to the target directory. // This is also used for MM table conversion. http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 3de83c6..c2d4612 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -42,6 +42,8 @@ import org.apache.hadoop.hive.metastore.TransactionalValidationListener; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.plan.CreateTableDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; @@ -1195,6 +1197,16 @@ public class AcidUtils { return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); } + public static boolean isAcidTable(CreateTableDesc table) { + if (table == null || table.getTblProps() == null) { + return false; + } + String tableIsTransactional = table.getTblProps().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if (tableIsTransactional == null) { + tableIsTransactional = table.getTblProps().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase()); + } + return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); + } public static boolean isFullAcidTable(Table table) { return isAcidTable(table) && !MetaStoreUtils.isInsertOnlyTable(table.getParameters()); http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index e51e648..d44d081 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -134,6 +134,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.UniqueConstraintsRequest; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; import org.apache.hadoop.hive.ql.exec.FunctionTask; import org.apache.hadoop.hive.ql.exec.FunctionUtils; @@ -3612,12 +3613,14 @@ private void constructOneLBLocationMap(FileStatus fSta, } } - private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, + public static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst, List<Path> newFiles) throws HiveException { // The layout for ACID files is table|partname/base|delta|delete_delta/bucket - // We will always only be writing delta files. In the buckets created by FileSinkOperator - // it will look like bucket/delta|delete_delta/bucket. So we need to move that into - // the above structure. For the first mover there will be no delta directory, + // We will always only be writing delta files ( except IOW which writes base_X/ ). + // In the buckets created by FileSinkOperator + // it will look like original_bucket/delta|delete_delta/bucket + // (e.g. .../-ext-10004/000000_0/delta_0000014_0000014_0000/bucket_00000). So we need to + // move that into the above structure. For the first mover there will be no delta directory, // so we can move the whole directory. // For everyone else we will need to just move the buckets under the existing delta // directory. @@ -3632,6 +3635,36 @@ private void constructOneLBLocationMap(FileStatus fSta, FileStatus[] origBucketStats = null; try { origBucketStats = fs.listStatus(srcPath, AcidUtils.originalBucketFilter); + if(origBucketStats == null || origBucketStats.length == 0) { + /** + check if we are dealing with data with non-standard layout. For example a write + produced by a (optimized) Union All query + which looks like + âââ -ext-10000 + âââ HIVE_UNION_SUBDIR_1 + â  âââ 000000_0 + â  âââ _orc_acid_version + â  âââ delta_0000019_0000019_0001 + â  âââ bucket_00000 + âââ HIVE_UNION_SUBDIR_2 + â  âââ 000000_0 + â  âââ _orc_acid_version + â  âââ delta_0000019_0000019_0002 + â  âââ bucket_00000 + The assumption is that we either have all data in subdirs or root of srcPath + but not both. + For Union case, we expect delta dirs to have unique names which is assured by + {@link org.apache.hadoop.hive.ql.optimizer.QueryPlanPostProcessor} + */ + FileStatus[] unionSubdirs = fs.globStatus(new Path(srcPath, + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "[0-9]*")); + List<FileStatus> buckets = new ArrayList<>(); + for(FileStatus unionSubdir : unionSubdirs) { + Collections.addAll(buckets, + fs.listStatus(unionSubdir.getPath(), AcidUtils.originalBucketFilter)); + } + origBucketStats = buckets.toArray(new FileStatus[buckets.size()]); + } } catch (IOException e) { String msg = "Unable to look for bucket files in src path " + srcPath.toUri().toString(); LOG.error(msg); @@ -3645,7 +3678,7 @@ private void constructOneLBLocationMap(FileStatus fSta, fs, dst, origBucketPath, createdDeltaDirs, newFiles); moveAcidFiles(AcidUtils.DELETE_DELTA_PREFIX, AcidUtils.deleteEventDeltaDirFilter, fs, dst,origBucketPath, createdDeltaDirs, newFiles); - moveAcidFiles(AcidUtils.BASE_PREFIX, AcidUtils.baseFileFilter, + moveAcidFiles(AcidUtils.BASE_PREFIX, AcidUtils.baseFileFilter,//for Insert Overwrite fs, dst, origBucketPath, createdDeltaDirs, newFiles); } } http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java new file mode 100644 index 0000000..b5bc386 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java @@ -0,0 +1,166 @@ +package org.apache.hadoop.hive.ql.optimizer; + +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork; +import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork; +import org.apache.hadoop.hive.ql.index.IndexMetadataChangeWork; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.parse.GenTezProcContext; +import org.apache.hadoop.hive.ql.parse.GenTezWork; +import org.apache.hadoop.hive.ql.parse.spark.GenSparkWork; +import org.apache.hadoop.hive.ql.plan.ArchiveWork; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork; +import org.apache.hadoop.hive.ql.plan.ColumnStatsWork; +import org.apache.hadoop.hive.ql.plan.ConditionalWork; +import org.apache.hadoop.hive.ql.plan.CopyWork; +import org.apache.hadoop.hive.ql.plan.DDLWork; +import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; +import org.apache.hadoop.hive.ql.plan.ExplainSQRewriteWork; +import org.apache.hadoop.hive.ql.plan.ExplainWork; +import org.apache.hadoop.hive.ql.plan.FetchWork; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.FunctionWork; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork; +import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.MoveWork; +import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.plan.StatsNoJobWork; +import org.apache.hadoop.hive.ql.plan.StatsWork; +import org.apache.hadoop.hive.ql.plan.TezWork; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Set; + +/** + * Finds Acid FileSinkDesc objects which can be created in the physical (disconnected) plan, e.g. + * {@link org.apache.hadoop.hive.ql.parse.GenTezUtils#removeUnionOperators(GenTezProcContext, BaseWork, int)} + * so that statementId can be properly assigned to ensure unique ROW__IDs + * {@link org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcFactory} is another example where + * Union All optimizations create new FileSinkDescS + */ +public class QueryPlanPostProcessor { + private static final Logger LOG = LoggerFactory.getLogger(QueryPlanPostProcessor.class); + + public QueryPlanPostProcessor(List<Task<?>> rootTasks, Set<FileSinkDesc> acidSinks, String executionId) { + for(Task<?> t : rootTasks) { + //Work + Object work = t.getWork(); + if(work instanceof TezWork) { + for(BaseWork bw : ((TezWork)work).getAllWorkUnsorted()) { + collectFileSinkDescs(bw.getAllLeafOperators(), acidSinks); + } + } + else if(work instanceof BaseWork) { + collectFileSinkDescs(((BaseWork)work).getAllLeafOperators(), acidSinks); + } + else if(work instanceof MapredWork) { + MapredWork w = (MapredWork)work; + if(w.getMapWork() != null) { + collectFileSinkDescs(w.getMapWork().getAllLeafOperators(), acidSinks); + } + if(w.getReduceWork() != null) { + collectFileSinkDescs(w.getReduceWork().getAllLeafOperators(), acidSinks); + } + } + else if(work instanceof SparkWork) { + for(BaseWork bw : ((SparkWork)work).getRoots()) { + collectFileSinkDescs(bw.getAllLeafOperators(), acidSinks); + } + } + else if(work instanceof MapredLocalWork) { + //I don't think this can have any FileSinkOperatorS - more future proofing + Set<FileSinkOperator> fileSinkOperatorSet = OperatorUtils.findOperators(((MapredLocalWork)work).getAliasToWork().values(), FileSinkOperator.class); + for(FileSinkOperator fsop : fileSinkOperatorSet) { + collectFileSinkDescs(fsop, acidSinks); + } + } + else if(work instanceof ExplainWork) { + new QueryPlanPostProcessor(((ExplainWork)work).getRootTasks(), acidSinks, executionId); + } + /* + ekoifman:~ ekoifman$ cd dev/hiverwgit/ql/src/java/org/apache/ +ekoifman:apache ekoifman$ find . -name *Work.java +./hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java +./hadoop/hive/ql/exec/repl/ReplDumpWork.java +./hadoop/hive/ql/exec/repl/ReplStateLogWork.java +./hadoop/hive/ql/index/IndexMetadataChangeWork.java +./hadoop/hive/ql/io/merge/MergeFileWork.java - extends MapWork +./hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java - extends MapWork +./hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateWork.java - extends MapWork +./hadoop/hive/ql/parse/GenTezWork.java +./hadoop/hive/ql/parse/spark/GenSparkWork.java +./hadoop/hive/ql/plan/ArchiveWork.java +./hadoop/hive/ql/plan/BaseWork.java +./hadoop/hive/ql/plan/ColumnStatsUpdateWork.java +./hadoop/hive/ql/plan/ColumnStatsWork.java +./hadoop/hive/ql/plan/ConditionalWork.java +./hadoop/hive/ql/plan/CopyWork.java +./hadoop/hive/ql/plan/DDLWork.java +./hadoop/hive/ql/plan/DependencyCollectionWork.java +./hadoop/hive/ql/plan/ExplainSQRewriteWork.java +./hadoop/hive/ql/plan/ExplainWork.java +./hadoop/hive/ql/plan/FetchWork.java +./hadoop/hive/ql/plan/FunctionWork.java +./hadoop/hive/ql/plan/MapredLocalWork.java +./hadoop/hive/ql/plan/MapredWork.java +./hadoop/hive/ql/plan/MapWork.java - extends BaseWork +./hadoop/hive/ql/plan/MergeJoinWork.java - extends BaseWork +./hadoop/hive/ql/plan/MoveWork.java +./hadoop/hive/ql/plan/ReduceWork.java +./hadoop/hive/ql/plan/ReplCopyWork.java - extends CopyWork +./hadoop/hive/ql/plan/SparkWork.java +./hadoop/hive/ql/plan/StatsNoJobWork.java +./hadoop/hive/ql/plan/StatsWork.java +./hadoop/hive/ql/plan/TezWork.java +./hadoop/hive/ql/plan/UnionWork.java - extends BaseWork + */ + else if(work instanceof ReplLoadWork || + work instanceof ReplStateLogWork || + work instanceof IndexMetadataChangeWork || + work instanceof GenTezWork || + work instanceof GenSparkWork || + work instanceof ArchiveWork || + work instanceof ColumnStatsUpdateWork || + work instanceof ColumnStatsWork || + work instanceof ConditionalWork || + work instanceof CopyWork || + work instanceof DDLWork || + work instanceof DependencyCollectionWork || + work instanceof ExplainSQRewriteWork || + work instanceof FetchWork || + work instanceof FunctionWork || + work instanceof MoveWork || + work instanceof StatsNoJobWork || + work instanceof StatsWork) { + LOG.debug("Found " + work.getClass().getName() + " - no FileSinkOperation can be present. executionId=" + executionId); + } + else { + //if here, someone must have added new Work object - should it be walked to find FileSinks? + throw new IllegalArgumentException("Unexpected Work object: " + work.getClass() + " executionId=" + executionId); + } + } + } + private void collectFileSinkDescs(Operator<?> leaf, Set<FileSinkDesc> acidSinks) { + if(leaf instanceof FileSinkOperator) { + FileSinkDesc fsd = ((FileSinkOperator) leaf).getConf(); + if(fsd.getWriteType() != AcidUtils.Operation.NOT_ACID) { + if(acidSinks.add(fsd)) { + if(LOG.isDebugEnabled()) { + LOG.debug("Found Acid Sink: " + fsd.getDirName()); + } + } + } + } + } + private void collectFileSinkDescs(Set<Operator<?>> leaves, Set<FileSinkDesc> acidSinks) { + for(Operator<?> leaf : leaves) { + collectFileSinkDescs(leaf, acidSinks); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java index 3a38a6d..bc26c5e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Stack; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; @@ -220,7 +221,7 @@ public final class UnionProcFactory { for (Operator<? extends OperatorDesc> parent : parents) { FileSinkDesc fileSinkDesc = (FileSinkDesc) fileSinkOp.getConf().clone(); - fileSinkDesc.setDirName(new Path(parentDirName, parent.getIdentifier())); + fileSinkDesc.setDirName(new Path(parentDirName, AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + parent.getIdentifier())); fileSinkDesc.setLinkedFileSink(true); Utilities.LOG14535.info("Created LinkedFileSink for union " + fileSinkDesc.getDirName() + "; parent " + parentDirName); parent.setChildOperators(null); http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index f50b57e..3475c7c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -131,6 +131,7 @@ import org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.optimizer.Optimizer; +import org.apache.hadoop.hive.ql.optimizer.QueryPlanPostProcessor; import org.apache.hadoop.hive.ql.optimizer.Transform; import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException; import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException.UnsupportedFeature; @@ -7003,6 +7004,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { viewDesc.setSchema(new ArrayList<FieldSchema>(field_schemas)); } + destTableIsAcid = tblDesc != null && AcidUtils.isAcidTable(tblDesc); + boolean isDestTempFile = true; if (!ctx.isMRTmpFileURI(dest_path.toUri().toString())) { idToTableNameMap.put(String.valueOf(destTableId), dest_path.toUri().toString()); @@ -7013,9 +7016,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { boolean isDfsDir = (dest_type.intValue() == QBMetaData.DEST_DFS_FILE); // Create LFD even for MM CTAS - it's a no-op move, but it still seems to be used for stats. - loadFileWork.add(new LoadFileDesc(tblDesc, viewDesc, - queryTmpdir, dest_path, isDfsDir, cols, colTypes, isMmCtas)); - + loadFileWork.add(new LoadFileDesc(tblDesc, viewDesc, queryTmpdir, dest_path, isDfsDir, cols, + colTypes, destTableIsAcid ? Operation.INSERT : Operation.NOT_ACID, isMmCtas)); if (tblDesc == null) { if (viewDesc != null) { table_desc = PlanUtils.getTableDesc(viewDesc, cols, colTypes); @@ -7104,7 +7106,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { FileSinkDesc fileSinkDesc = createFileSinkDesc(dest, table_desc, dest_part, dest_path, currentTableId, destTableIsAcid, destTableIsTemporary, destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS, - canBeMerged, dest_tab, txnId, isMmCtas); + canBeMerged, dest_tab, txnId, isMmCtas, dest_type); if (isMmCtas) { // Add FSD so that the LoadTask compilation could fix up its path to avoid the move. tableDesc.setWriter(fileSinkDesc); @@ -7212,7 +7214,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { boolean destTableIsAcid, boolean destTableIsTemporary, boolean destTableIsMaterialization, Path queryTmpdir, SortBucketRSCtx rsCtx, DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx, - RowSchema fsRS, boolean canBeMerged, Table dest_tab, Long mmWriteId, boolean isMmCtas) throws SemanticException { + RowSchema fsRS, boolean canBeMerged, Table dest_tab, Long mmWriteId, boolean isMmCtas, + Integer dest_type) throws SemanticException { FileSinkDesc fileSinkDesc = new FileSinkDesc(queryTmpdir, table_desc, conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId, rsCtx.isMultiFileSpray(), canBeMerged, rsCtx.getNumFiles(), rsCtx.getTotalFiles(), rsCtx.getPartnCols(), dpCtx, @@ -7236,10 +7239,22 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { (deleting(dest) ? AcidUtils.Operation.DELETE : AcidUtils.Operation.INSERT); fileSinkDesc.setWriteType(wt); - String destTableFullName = dest_tab.getCompleteName().replace('@', '.'); - Map<String, ASTNode> iowMap = qb.getParseInfo().getInsertOverwriteTables(); - if (iowMap.containsKey(destTableFullName)) { - fileSinkDesc.setInsertOverwrite(true); + switch (dest_type) { + case QBMetaData.DEST_PARTITION: + //fall through + case QBMetaData.DEST_TABLE: + //INSERT [OVERWRITE] path + String destTableFullName = dest_tab.getCompleteName().replace('@', '.'); + Map<String, ASTNode> iowMap = qb.getParseInfo().getInsertOverwriteTables(); + if (iowMap.containsKey(destTableFullName)) { + fileSinkDesc.setInsertOverwrite(true); + } + break; + case QBMetaData.DEST_DFS_FILE: + //CTAS path + break; + default: + throw new IllegalStateException("Unexpected dest_type=" + dest_tab); } acidFileSinks.add(fileSinkDesc); } @@ -11555,6 +11570,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { compiler.compile(pCtx, rootTasks, inputs, outputs); fetchTask = pCtx.getFetchTask(); } + //find all Acid FileSinkOperatorS + QueryPlanPostProcessor qp = new QueryPlanPostProcessor(rootTasks, acidFileSinks, ctx.getExecutionId()); LOG.info("Completed plan generation"); // 10. put accessed columns to readEntity http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java index d708df3..45d4fb0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.plan; import java.io.Serializable; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.plan.Explain.Level; /** @@ -30,12 +31,15 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level; public class LoadDesc implements Serializable { private static final long serialVersionUID = 1L; private Path sourcePath; + /** + * Need to remember whether this is an acid compliant operation, and if so whether it is an + * insert, update, or delete. + */ + private final AcidUtils.Operation writeType; - public LoadDesc() { - } - - public LoadDesc(final Path sourcePath) { + public LoadDesc(final Path sourcePath, AcidUtils.Operation writeType) { this.sourcePath = sourcePath; + this.writeType = writeType; } @Explain(displayName = "source", explainLevels = { Level.EXTENDED }) @@ -46,4 +50,14 @@ public class LoadDesc implements Serializable { public void setSourcePath(Path sourcePath) { this.sourcePath = sourcePath; } + + public AcidUtils.Operation getWriteType() { + return writeType; + } + + @Explain(displayName = "Write Type") + public String getWriteTypeString() { + //if acid write, add to plan output, else don't bother + return getWriteType() == AcidUtils.Operation.NOT_ACID ? null : getWriteType().toString(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java index 6fad710..0032648 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java @@ -21,8 +21,8 @@ package org.apache.hadoop.hive.ql.plan; import java.io.Serializable; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.PTFUtils; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.io.AcidUtils; /** * LoadFileDesc. @@ -38,11 +38,8 @@ public class LoadFileDesc extends LoadDesc implements Serializable { private String destinationCreateTable; private boolean isMmCtas; - public LoadFileDesc() { - } - public LoadFileDesc(final LoadFileDesc o) { - super(o.getSourcePath()); + super(o.getSourcePath(), o.getWriteType()); this.targetDir = o.targetDir; this.isDfsDir = o.isDfsDir; @@ -54,8 +51,8 @@ public class LoadFileDesc extends LoadDesc implements Serializable { public LoadFileDesc(final CreateTableDesc createTableDesc, final CreateViewDesc createViewDesc, final Path sourcePath, final Path targetDir, final boolean isDfsDir, - final String columns, final String columnTypes, boolean isMmCtas) { - this(sourcePath, targetDir, isDfsDir, columns, columnTypes, isMmCtas); + final String columns, final String columnTypes, AcidUtils.Operation writeType, boolean isMmCtas) { + this(sourcePath, targetDir, isDfsDir, columns, columnTypes, writeType, isMmCtas); if (createTableDesc != null && createTableDesc.getDatabaseName() != null && createTableDesc.getTableName() != null) { destinationCreateTable = (createTableDesc.getTableName().contains(".") ? "" : createTableDesc @@ -68,9 +65,14 @@ public class LoadFileDesc extends LoadDesc implements Serializable { } } - public LoadFileDesc(final Path sourcePath, final Path targetDir, final boolean isDfsDir, - final String columns, final String columnTypes, boolean isMmCtas) { - super(sourcePath); + public LoadFileDesc(final Path sourcePath, final Path targetDir, + final boolean isDfsDir, final String columns, final String columnTypes, boolean isMmCtas) { + this(sourcePath, targetDir, isDfsDir, columns, columnTypes, AcidUtils.Operation.NOT_ACID, isMmCtas); + } + private LoadFileDesc(final Path sourcePath, final Path targetDir, + final boolean isDfsDir, final String columns, + final String columnTypes, AcidUtils.Operation writeType, boolean isMmCtas) { + super(sourcePath, writeType); Utilities.LOG14535.info("creating LFD from " + sourcePath + " to " + targetDir); this.targetDir = targetDir; this.isDfsDir = isDfsDir; http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java index 3201dc9..e893ab5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java @@ -32,17 +32,13 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level; * LoadTableDesc. * */ -public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc - implements Serializable { +public class LoadTableDesc extends LoadDesc implements Serializable { private static final long serialVersionUID = 1L; private boolean replace; private DynamicPartitionCtx dpCtx; private ListBucketingCtx lbCtx; private boolean inheritTableSpecs = true; //For partitions, flag controlling whether the current //table specs are to be used - // Need to remember whether this is an acid compliant operation, and if so whether it is an - // insert, update, or delete. - private AcidUtils.Operation writeType; private Long txnId; private int stmtId; @@ -52,13 +48,12 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc private boolean commitMmWriteId = true; public LoadTableDesc(final LoadTableDesc o) { - super(o.getSourcePath()); + super(o.getSourcePath(), o.getWriteType()); this.replace = o.replace; this.dpCtx = o.dpCtx; this.lbCtx = o.lbCtx; this.inheritTableSpecs = o.inheritTableSpecs; - this.writeType = o.writeType; this.table = o.table; this.partitionSpec = o.partitionSpec; } @@ -67,11 +62,12 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc final org.apache.hadoop.hive.ql.plan.TableDesc table, final Map<String, String> partitionSpec, final boolean replace, - final AcidUtils.Operation writeType, Long txnId) { - super(sourcePath); + final AcidUtils.Operation writeType, + Long txnId) { + super(sourcePath, writeType); Utilities.LOG14535.info("creating part LTD from " + sourcePath + " to " + ((table.getProperties() == null) ? "null" : table.getTableName())); - init(table, partitionSpec, replace, writeType, txnId); + init(table, partitionSpec, replace, txnId); } /** @@ -113,13 +109,13 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc final DynamicPartitionCtx dpCtx, final AcidUtils.Operation writeType, boolean isReplace, Long txnId) { - super(sourcePath); + super(sourcePath, writeType); Utilities.LOG14535.info("creating LTD from " + sourcePath + " to " + table.getTableName()/*, new Exception()*/); this.dpCtx = dpCtx; if (dpCtx != null && dpCtx.getPartSpec() != null && partitionSpec == null) { - init(table, dpCtx.getPartSpec(), isReplace, writeType, txnId); + init(table, dpCtx.getPartSpec(), isReplace, txnId); } else { - init(table, new LinkedHashMap<String, String>(), isReplace, writeType, txnId); + init(table, new LinkedHashMap<String, String>(), isReplace, txnId); } } @@ -127,11 +123,10 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc final org.apache.hadoop.hive.ql.plan.TableDesc table, final Map<String, String> partitionSpec, final boolean replace, - AcidUtils.Operation writeType, Long txnId) { + Long txnId) { this.table = table; this.partitionSpec = partitionSpec; this.replace = replace; - this.writeType = writeType; this.txnId = txnId; } @@ -201,10 +196,6 @@ public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc this.lbCtx = lbCtx; } - public AcidUtils.Operation getWriteType() { - return writeType; - } - public Long getTxnId() { return txnId; } http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java index b695f0f..bbed9be 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java @@ -22,12 +22,12 @@ import org.apache.hadoop.hive.conf.HiveConf; public class TezEdgeProperty { - public enum EdgeType { - SIMPLE_EDGE, + public enum EdgeType {//todo: HIVE-15549 + SIMPLE_EDGE,//SORT_PARTITION_EDGE BROADCAST_EDGE, - CONTAINS, - CUSTOM_EDGE, - CUSTOM_SIMPLE_EDGE, + CONTAINS,//used for union (all?) + CUSTOM_EDGE,//CO_PARTITION_EDGE + CUSTOM_SIMPLE_EDGE,//PARTITION_EDGE ONE_TO_ONE_EDGE } http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java index 1f0c269..880329d 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java @@ -5,7 +5,10 @@ import org.apache.hadoop.hive.ql.io.BucketCodec; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,6 +23,8 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests { File.separator + TestTxnNoBuckets.class.getCanonicalName() + "-" + System.currentTimeMillis() ).getPath().replaceAll("\\\\", "/"); + @Rule + public TestName testName = new TestName(); @Override String getTestDataDir() { return TEST_DATA_DIR; @@ -65,17 +70,7 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests { Assert.assertTrue(rs.get(2), rs.get(2).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00001")); Assert.assertTrue(rs.get(3), rs.get(3).startsWith("{\"transactionid\":19,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t")); Assert.assertTrue(rs.get(3), rs.get(3).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00001")); - /*todo: WTF? - RS for update seems to spray randomly... is that OK? maybe as long as all resultant files have different names... will they? - Assuming we name them based on taskId, we should create bucketX and bucketY. - we delete events can be written to bucketX file it could be useful for filter delete for a split by file name since the insert - events seem to be written to a proper bucketX file. In fact this may reduce the number of changes elsewhere like compactor... maybe - But this limits the parallelism - what is worse, you don't know what the parallelism should be until you have a list of all the - input files since bucket count is no longer a metadata property. Also, with late Update split, the file name has already been determined - from taskId so the Insert part won't end up matching the bucketX property necessarily. - With early Update split, the Insert can still be an insert - i.e. go to appropriate bucketX. But deletes will still go wherever (random shuffle) - unless you know all the bucketX files to be read - may not be worth the trouble. - * 2nd: something in FS fails. ArrayIndexOutOfBoundsException: 1 at FileSinkOperator.process(FileSinkOperator.java:779)*/ + runStatementOnDriver("update nobuckets set c3 = 17 where c3 in(0,1)"); rs = runStatementOnDriver("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME from nobuckets order by INPUT__FILE__NAME, ROW__ID"); LOG.warn("after update"); @@ -152,15 +147,6 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests { } /** - * all of these pass but don't do exactly the right thing - * files land as if it's not an acid table "warehouse/myctas4/000000_0" - * even though in {@link org.apache.hadoop.hive.metastore.TransactionalValidationListener} fires - * and sees it as transactional table - * look for QB.isCTAS() and CreateTableDesc() in SemanticAnalyzer - * - * On read, these files are treated like non acid to acid conversion - * - * see HIVE-15899 * See CTAS tests in TestAcidOnTez */ @Test @@ -169,30 +155,177 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests { runStatementOnDriver("insert into " + Table.NONACIDORCTBL + makeValuesClause(values)); runStatementOnDriver("create table myctas stored as ORC TBLPROPERTIES ('transactional" + "'='true', 'transactional_properties'='default') as select a, b from " + Table.NONACIDORCTBL); - List<String> rs = runStatementOnDriver("select * from myctas order by a, b"); - Assert.assertEquals(stringifyValues(values), rs); + List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from myctas order by ROW__ID"); + String expected[][] = { + {"{\"transactionid\":14,\"bucketid\":536870912,\"rowid\":0}\t3\t4", "warehouse/myctas/delta_0000014_0000014_0000/bucket_00000"}, + {"{\"transactionid\":14,\"bucketid\":536870912,\"rowid\":1}\t1\t2", "warehouse/myctas/delta_0000014_0000014_0000/bucket_00000"}, + }; + checkExpected(rs, expected, "Unexpected row count after ctas from non acid table"); runStatementOnDriver("insert into " + Table.ACIDTBL + makeValuesClause(values)); runStatementOnDriver("create table myctas2 stored as ORC TBLPROPERTIES ('transactional" + "'='true', 'transactional_properties'='default') as select a, b from " + Table.ACIDTBL); - rs = runStatementOnDriver("select * from myctas2 order by a, b"); - Assert.assertEquals(stringifyValues(values), rs); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from myctas2 order by ROW__ID"); + String expected2[][] = { + {"{\"transactionid\":17,\"bucketid\":536870912,\"rowid\":0}\t3\t4", "warehouse/myctas2/delta_0000017_0000017_0000/bucket_00000"}, + {"{\"transactionid\":17,\"bucketid\":536870912,\"rowid\":1}\t1\t2", "warehouse/myctas2/delta_0000017_0000017_0000/bucket_00000"}, + }; + checkExpected(rs, expected2, "Unexpected row count after ctas from acid table"); runStatementOnDriver("create table myctas3 stored as ORC TBLPROPERTIES ('transactional" + "'='true', 'transactional_properties'='default') as select a, b from " + Table.NONACIDORCTBL + " union all select a, b from " + Table.ACIDTBL); - rs = runStatementOnDriver("select * from myctas3 order by a, b"); - Assert.assertEquals(stringifyValues(new int[][] {{1,2},{1,2},{3,4},{3,4}}), rs); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from myctas3 order by ROW__ID"); + String expected3[][] = { + {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t3\t4", "warehouse/myctas3/delta_0000019_0000019_0000/bucket_00000"}, + {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t1\t2", "warehouse/myctas3/delta_0000019_0000019_0000/bucket_00000"}, + {"{\"transactionid\":19,\"bucketid\":536936448,\"rowid\":0}\t3\t4", "warehouse/myctas3/delta_0000019_0000019_0000/bucket_00001"}, + {"{\"transactionid\":19,\"bucketid\":536936448,\"rowid\":1}\t1\t2", "warehouse/myctas3/delta_0000019_0000019_0000/bucket_00001"}, + }; + checkExpected(rs, expected3, "Unexpected row count after ctas from union all query"); runStatementOnDriver("create table myctas4 stored as ORC TBLPROPERTIES ('transactional" + "'='true', 'transactional_properties'='default') as select a, b from " + Table.NONACIDORCTBL + " union distinct select a, b from " + Table.ACIDTBL); - rs = runStatementOnDriver("select * from myctas4 order by a, b"); + rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from myctas4 order by ROW__ID"); + String expected4[][] = { + {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "/delta_0000021_0000021_0000/bucket_00000"}, + {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "/delta_0000021_0000021_0000/bucket_00000"}, + }; + checkExpected(rs, expected4, "Unexpected row count after ctas from union distinct query"); + } + /** + * Insert into unbucketed acid table from union all query + * Union All is flattend so nested subdirs are created and acid move drops them since + * delta dirs have unique names + */ + @Test + public void testInsertToAcidWithUnionRemove() throws Exception { + hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_OPTIMIZE_UNION_REMOVE, true); + hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + d.close(); + d = new Driver(hiveConf); + int[][] values = {{1,2},{3,4},{5,6},{7,8},{9,10}}; + runStatementOnDriver("insert into " + TxnCommandsBaseForTests.Table.ACIDTBL + makeValuesClause(values));//HIVE-17138: this creates 1 delta_0000013_0000013_0000/bucket_00001 + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='true')"); + /* + So Union All removal kicks in and we get 3 subdirs in staging. +ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1505516390532/warehouse/t/.hive-staging_hive_2017-09-15_16-05-06_895_1123322677843388168-1/ +âââ -ext-10000 + âââ HIVE_UNION_SUBDIR_19 + â  âââ 000000_0 + â  âââ _orc_acid_version + â  âââ delta_0000016_0000016_0001 + âââ HIVE_UNION_SUBDIR_20 + â  âââ 000000_0 + â  âââ _orc_acid_version + â  âââ delta_0000016_0000016_0002 + âââ HIVE_UNION_SUBDIR_21 + âââ 000000_0 + âââ _orc_acid_version + âââ delta_0000016_0000016_0003*/ + runStatementOnDriver("insert into T(a,b) select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a between 1 and 3 group by a, b union all select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where a >= 9"); + + List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by ROW__ID"); + + String expected[][] = { + {"{\"transactionid\":16,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000016_0000016_0001/bucket_00000"}, + {"{\"transactionid\":16,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000016_0000016_0001/bucket_00000"}, + {"{\"transactionid\":16,\"bucketid\":536870914,\"rowid\":0}\t7\t8", "/delta_0000016_0000016_0002/bucket_00000"}, + {"{\"transactionid\":16,\"bucketid\":536870914,\"rowid\":1}\t5\t6", "/delta_0000016_0000016_0002/bucket_00000"}, + {"{\"transactionid\":16,\"bucketid\":536870915,\"rowid\":0}\t9\t10", "/delta_0000016_0000016_0003/bucket_00000"}, + }; + checkExpected(rs, expected, "Unexpected row count after ctas"); + } + private void checkExpected(List<String> rs, String[][] expected, String msg) { + LOG.warn(testName.getMethodName() + ": read data(" + msg + "): "); + for(String s : rs) { + LOG.warn(s); + } + Assert.assertEquals( testName.getMethodName() + ": " + msg, expected.length, rs.size()); + //verify data and layout + for(int i = 0; i < expected.length; i++) { + Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected[i][0])); + Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith(expected[i][1])); + } + } + /** + * The idea here is to create a non acid table that was written by multiple writers, i.e. + * unbucketed table that has 000000_0 & 000001_0, for example. Unfortunately this doesn't work + * due to 'merge' logic - see comments in the method + */ + @Ignore + @Test + public void testToAcidConversionMultiBucket() throws Exception { + int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + makeValuesClause(values)); + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='false')"); + /*T non-acid + non bucketd - 3 writers are created and then followed by merge to create a single output file + though how the data from union is split between writers is a mystery + (bucketed tables don't do merge) + Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1505317179157/warehouse/t/.hive-staging_hive_2017-09-13_08-40-30_275_8623609103176711840-1/-ext-10000/000000_0 [length: 515] +{"a":6,"b":8} +{"a":9,"b":10} +{"a":5,"b":6} +{"a":1,"b":2} +{"a":2,"b":4} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1505317179157/warehouse/t/.hive-staging_hive_2017-09-13_08-40-30_275_8623609103176711840-1/-ext-10003/000000_0 [length: 242] +{"a":6,"b":8} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1505317179157/warehouse/t/.hive-staging_hive_2017-09-13_08-40-30_275_8623609103176711840-1/-ext-10003/000001_0 [length: 244] +{"a":9,"b":10} +{"a":5,"b":6} +________________________________________________________________________________________________________________________ + +Processing data file file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1505317179157/warehouse/t/.hive-staging_hive_2017-09-13_08-40-30_275_8623609103176711840-1/-ext-10003/000002_0 [length: 242] +{"a":1,"b":2} +{"a":2,"b":4} + */ + runStatementOnDriver("insert into T(a,b) select a, b from " + Table.ACIDTBL + " where a between 1 and 3 group by a, b union all select a, b from " + Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " + Table.ACIDTBL + " where a >= 9"); + List<String> rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); + LOG.warn("before converting to acid"); + for(String s : rs) { + LOG.warn(s); + } + } + @Test + public void testInsertFromUnion() throws Exception { + int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}}; + runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + makeValuesClause(values)); + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T (a int, b int) stored as ORC TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("insert into T(a,b) select a, b from " + Table.NONACIDNONBUCKET + " where a between 1 and 3 group by a, b union all select a, b from " + Table.NONACIDNONBUCKET + " where a between 5 and 7 union all select a, b from " + Table.NONACIDNONBUCKET + " where a >= 9"); + List<String> rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME"); + LOG.warn("before converting to acid"); + for(String s : rs) { + LOG.warn(s); + } + /* + The number of writers seems to be based on number of MR jobs for the src query. todo check number of FileSinks + warehouse/t/.hive-staging_hive_2017-09-13_08-59-28_141_6304543600372946004-1/-ext-10000/000000_0/delta_0000016_0000016_0000/bucket_00000 [length: 648] + {"operation":0,"originalTransaction":16,"bucket":536870912,"rowId":0,"currentTransaction":16,"row":{"_col0":1,"_col1":2}} + {"operation":0,"originalTransaction":16,"bucket":536870912,"rowId":1,"currentTransaction":16,"row":{"_col0":2,"_col1":4}} + ________________________________________________________________________________________________________________________ + warehouse/t/.hive-staging_hive_2017-09-13_08-59-28_141_6304543600372946004-1/-ext-10000/000001_0/delta_0000016_0000016_0000/bucket_00001 [length: 658] + {"operation":0,"originalTransaction":16,"bucket":536936448,"rowId":0,"currentTransaction":16,"row":{"_col0":5,"_col1":6}} + {"operation":0,"originalTransaction":16,"bucket":536936448,"rowId":1,"currentTransaction":16,"row":{"_col0":6,"_col1":8}} + {"operation":0,"originalTransaction":16,"bucket":536936448,"rowId":2,"currentTransaction":16,"row":{"_col0":9,"_col1":10}} + */ + rs = runStatementOnDriver("select a, b from T order by a, b"); Assert.assertEquals(stringifyValues(values), rs); + rs = runStatementOnDriver("select ROW__ID from T group by ROW__ID having count(*) > 1"); + if(rs.size() > 0) { + Assert.assertEquals("Duplicate ROW__IDs: " + rs.get(0), 0, rs.size()); + } } /** * see HIVE-16177 - * See also {@link TestTxnCommands2#testNonAcidToAcidConversion02()} todo need test with > 1 bucket file + * See also {@link TestTxnCommands2#testNonAcidToAcidConversion02()} */ @Test public void testToAcidConversion02() throws Exception {