cherry picked HIVE-15899 from master

Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5b3c7979
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5b3c7979
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5b3c7979

Branch: refs/heads/hive-14535
Commit: 5b3c79798e28fd3c7f5718e361efee0f77eec047
Parents: 6f0be79
Author: Eugene Koifman <ekoif...@hortonworks.com>
Authored: Tue Sep 26 19:42:06 2017 -0700
Committer: Eugene Koifman <ekoif...@hortonworks.com>
Committed: Tue Sep 26 19:48:41 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/TestAcidOnTez.java    | 329 +++++++++++++++----
 .../java/org/apache/hadoop/hive/ql/Context.java |   3 +
 .../java/org/apache/hadoop/hive/ql/Driver.java  |   6 +-
 .../apache/hadoop/hive/ql/exec/MoveTask.java    |  13 +-
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java |  12 +
 .../apache/hadoop/hive/ql/metadata/Hive.java    |  43 ++-
 .../ql/optimizer/QueryPlanPostProcessor.java    | 166 ++++++++++
 .../optimizer/unionproc/UnionProcFactory.java   |   3 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |  35 +-
 .../apache/hadoop/hive/ql/plan/LoadDesc.java    |  22 +-
 .../hadoop/hive/ql/plan/LoadFileDesc.java       |  22 +-
 .../hadoop/hive/ql/plan/LoadTableDesc.java      |  29 +-
 .../hadoop/hive/ql/plan/TezEdgeProperty.java    |  10 +-
 .../apache/hadoop/hive/ql/TestTxnNoBuckets.java | 189 +++++++++--
 .../clientpositive/autoColumnStats_4.q.out      |   1 +
 .../clientpositive/llap/acid_no_buckets.q.out   |   8 +
 .../llap/dynamic_semijoin_reduction_3.q.out     |   7 +
 .../llap/dynpart_sort_optimization_acid.q.out   |  12 +
 .../results/clientpositive/llap/mm_all.q.out    |   1 +
 .../results/clientpositive/llap/sqlmerge.q.out  |   4 +
 ql/src/test/results/clientpositive/mm_all.q.out |   1 +
 21 files changed, 760 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
index d0b5cf6..8b4b21f 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
@@ -33,6 +33,7 @@ import 
org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator;
 import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
@@ -88,7 +89,6 @@ public class TestAcidOnTez {
     hiveConf = new HiveConf(this.getClass());
     hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
     hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
-    hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
     hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, 
TEST_WAREHOUSE_DIR);
     hiveConf.setVar(HiveConf.ConfVars.HIVEMAPREDMODE, "nonstrict");
     hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, 
HiveInputFormat.class.getName());
@@ -179,14 +179,14 @@ public class TestAcidOnTez {
     runStatementOnDriver("create table " + Table.NONACIDNONBUCKET + " stored 
as ORC TBLPROPERTIES('transactional'='false') as " +
       "select a, b from " + Table.ACIDTBL + " where a <= 5 union all select a, 
b from " + Table.NONACIDORCTBL + " where a >= 5", confForTez);
 
-    List<String> rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME 
from " + Table.NONACIDNONBUCKET + " order by a, b, INPUT__FILE__NAME");
+    List<String> rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME 
from " + Table.NONACIDNONBUCKET + " order by a, b, INPUT__FILE__NAME", 
confForTez);
     String expected0[][] = {
-      {"1\t2", "/1/000000_0"},
-      {"3\t4", "/1/000000_0"},
-      {"5\t6", "/1/000000_0"},
-      {"5\t6", "/2/000000_0"},
-      {"7\t8", "/2/000000_0"},
-      {"9\t10", "/2/000000_0"},
+      {"1\t2", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"},
+      {"3\t4", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"},
+      {"5\t6", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"},
+      {"5\t6", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"},
+      {"7\t8", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"},
+      {"9\t10", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"},
     };
     Assert.assertEquals("Unexpected row count after ctas", expected0.length, 
rs.size());
     //verify data and layout
@@ -195,9 +195,9 @@ public class TestAcidOnTez {
       Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), 
rs.get(i).endsWith(expected0[i][1]));
     }
     //make the table ACID
-    runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " SET 
TBLPROPERTIES ('transactional'='true')");
+    runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " SET 
TBLPROPERTIES ('transactional'='true')", confForTez);
 
-    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " 
+ Table.NONACIDNONBUCKET + " order by ROW__ID");
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " 
+ Table.NONACIDNONBUCKET + " order by ROW__ID", confForTez);
     LOG.warn("after ctas:");
     for (String s : rs) {
       LOG.warn(s);
@@ -206,12 +206,12 @@ public class TestAcidOnTez {
     /*
     * Expected result 0th entry i the RecordIdentifier + data.  1st entry file 
before compact*/
     String expected[][] = {
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", 
"/1/000000_0"},
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", 
"/1/000000_0"},
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", 
"/1/000000_0"},
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", 
"/2/000000_0"},
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", 
"/2/000000_0"},
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", 
"/2/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", 
AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", 
AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", 
AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", 
AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", 
AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", 
AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"},
     };
     Assert.assertEquals("Unexpected row count after ctas", expected.length, 
rs.size());
     //verify data and layout
@@ -220,17 +220,17 @@ public class TestAcidOnTez {
       Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), 
rs.get(i).endsWith(expected[i][1]));
     }
     //perform some Update/Delete
-    runStatementOnDriver("update " + Table.NONACIDNONBUCKET + " set a = 70, b  
= 80 where a = 7");
-    runStatementOnDriver("delete from " + Table.NONACIDNONBUCKET + " where a = 
5");
-    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " 
+ Table.NONACIDNONBUCKET + " order by ROW__ID");
+    runStatementOnDriver("update " + Table.NONACIDNONBUCKET + " set a = 70, b  
= 80 where a = 7", confForTez);
+    runStatementOnDriver("delete from " + Table.NONACIDNONBUCKET + " where a = 
5", confForTez);
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " 
+ Table.NONACIDNONBUCKET + " order by ROW__ID", confForTez);
     LOG.warn("after update/delete:");
     for (String s : rs) {
       LOG.warn(s);
     }
     String[][] expected2 = {
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", 
"/1/000000_0"},
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", 
"/1/000000_0"},
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", 
"/2/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", 
AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", 
AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", 
AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"},
       {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t70\t80", 
"delta_0000021_0000021_0000/bucket_00000"}
     };
     Assert.assertEquals("Unexpected row count after update", expected2.length, 
rs.size());
@@ -255,9 +255,9 @@ public class TestAcidOnTez {
       Assert.assertNull("at " + i + " " + expectedDelDelta[i] + " not found on 
disk", expectedDelDelta[i]);
     }
     //run Minor compaction
-    runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " compact 
'minor'");
+    runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " compact 
'minor'", confForTez);
     TestTxnCommands2.runWorker(hiveConf);
-    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " 
+ Table.NONACIDNONBUCKET + " order by ROW__ID");
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " 
+ Table.NONACIDNONBUCKET + " order by ROW__ID", confForTez);
     LOG.warn("after compact minor:");
     for (String s : rs) {
       LOG.warn(s);
@@ -285,9 +285,9 @@ public class TestAcidOnTez {
       Assert.assertNull("at " + i + " " + expectedDelDelta2[i] + " not found 
on disk", expectedDelDelta2[i]);
     }
     //run Major compaction
-    runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " compact 
'major'");
+    runStatementOnDriver("alter table " + Table.NONACIDNONBUCKET + " compact 
'major'", confForTez);
     TestTxnCommands2.runWorker(hiveConf);
-    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " 
+ Table.NONACIDNONBUCKET + " order by ROW__ID");
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " 
+ Table.NONACIDNONBUCKET + " order by ROW__ID", confForTez);
     LOG.warn("after compact major:");
     for (String s : rs) {
       LOG.warn(s);
@@ -306,8 +306,12 @@ public class TestAcidOnTez {
    * How to do this?  CTAS is the only way to create data files which are not 
immediate children
    * of the partition dir.  CTAS/Union/Tez doesn't support partition tables.  
The only way is to copy
    * data files in directly.
+   *
+   * Actually Insert Into ... select ... union all ... with
+   * HIVE_OPTIMIZE_UNION_REMOVE (and HIVEFETCHTASKCONVERSION="none"?) will 
create subdirs
+   * but if writing to non acid table there is a merge task on MR (but not on 
Tez)
    */
-  @Ignore("HIVE-17214")
+  @Ignore("HIVE-17214")//this consistently works locally but never in ptest....
   @Test
   public void testNonStandardConversion02() throws Exception {
     HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing 
hive conf
@@ -320,13 +324,13 @@ public class TestAcidOnTez {
       "union all select a, b from " + Table.ACIDTBL + " where a = 5", 
confForTez);
 
     List<String> rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME 
from " +
-      Table.NONACIDNONBUCKET + " order by a, b");
+      Table.NONACIDNONBUCKET + " order by a, b", confForTez);
     String expected0[][] = {
-      {"1\t2", "/1/000000_0"},
-      {"3\t4", "/1/000000_0"},
-      {"5\t6", "/3/000000_0"},
-      {"7\t8", "/2/000000_0"},
-      {"9\t10", "/2/000000_0"},
+      {"1\t2", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"},
+      {"3\t4", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"},
+      {"5\t6", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "3/000000_0"},
+      {"7\t8", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"},
+      {"9\t10", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"},
     };
     Assert.assertEquals("Unexpected row count after ctas", expected0.length, 
rs.size());
     //verify data and layout
@@ -338,7 +342,7 @@ public class TestAcidOnTez {
     FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
       (Table.NONACIDNONBUCKET).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
     //ensure there is partition dir
-    runStatementOnDriver("insert into " + Table.NONACIDPART + " partition 
(p=1) values (100,110)");
+    runStatementOnDriver("insert into " + Table.NONACIDPART + " partition 
(p=1) values (100,110)", confForTez);
     //creates more files in that partition
     for(FileStatus stat : status) {
       int limit = 5;
@@ -357,29 +361,29 @@ public class TestAcidOnTez {
     nonacidpart/
     └── p=1
     ├── 000000_0
-    ├── 1
+    ├── HIVE_UNION_SUBDIR__1
     │   └── 000000_0
-    ├── 2
+    ├── HIVE_UNION_SUBDIR_2
     │   └── 000000_0
-    └── 3
+    └── HIVE_UNION_SUBDIR_3
         └── 000000_0
 
 4 directories, 4 files
     **/
     //make the table ACID
-    runStatementOnDriver("alter table " + Table.NONACIDPART + " SET 
TBLPROPERTIES ('transactional'='true')");
-    rs = runStatementOnDriver("select ROW__ID, a, b, p, INPUT__FILE__NAME from 
" + Table.NONACIDPART + " order by ROW__ID");
+    runStatementOnDriver("alter table " + Table.NONACIDPART + " SET 
TBLPROPERTIES ('transactional'='true')", confForTez);
+    rs = runStatementOnDriver("select ROW__ID, a, b, p, INPUT__FILE__NAME from 
" + Table.NONACIDPART + " order by ROW__ID", confForTez);
     LOG.warn("after acid conversion:");
     for (String s : rs) {
       LOG.warn(s);
     }
     String[][] expected = {
       
{"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t100\t110\t1", 
"nonacidpart/p=1/000000_0"},
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t1", 
"nonacidpart/p=1/1/000000_0"},
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t3\t4\t1", 
"nonacidpart/p=1/1/000000_0"},
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10\t1", 
"nonacidpart/p=1/2/000000_0"},
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8\t1", 
"nonacidpart/p=1/2/000000_0"},
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6\t1", 
"nonacidpart/p=1/3/000000_0"}
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t1", 
"nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + 
"1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t3\t4\t1", 
"nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + 
"1/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10\t1", 
"nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + 
"2/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8\t1", 
"nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + 
"2/000000_0"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6\t1", 
"nonacidpart/p=1/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + 
"3/000000_0"}
     };
     Assert.assertEquals("Wrong row count", expected.length, rs.size());
     //verify data and layout
@@ -389,9 +393,9 @@ public class TestAcidOnTez {
     }
 
     //run Major compaction
-    runStatementOnDriver("alter table " + Table.NONACIDPART + " partition 
(p=1) compact 'major'");
+    runStatementOnDriver("alter table " + Table.NONACIDPART + " partition 
(p=1) compact 'major'", confForTez);
     TestTxnCommands2.runWorker(hiveConf);
-    rs = runStatementOnDriver("select ROW__ID, a, b, p, INPUT__FILE__NAME from 
" + Table.NONACIDPART + " order by ROW__ID");
+    rs = runStatementOnDriver("select ROW__ID, a, b, p, INPUT__FILE__NAME from 
" + Table.NONACIDPART + " order by ROW__ID", confForTez);
     LOG.warn("after major compaction:");
     for (String s : rs) {
       LOG.warn(s);
@@ -406,33 +410,42 @@ public class TestAcidOnTez {
   }
   /**
    * CTAS + Tez + Union creates a non-standard layout in table dir
-   * Each leg of the union places data into a subdir of the table/partition.  
Subdirs are named 1/, 2/, etc
-   * The way this currently works is that CTAS creates an Acid table but the 
insert statement writes
-   * the data in non-acid layout.  Then on read, it's treated like an non-acid 
to acid conversion.
-   * Longer term CTAS should create acid layout from the get-go.
+   * Each leg of the union places data into a subdir of the table/partition.
+   * Subdirs are named HIVE_UNION_SUBDIR_1/, HIVE_UNION_SUBDIR_2/, etc
+   * For Acid tables the writer for each dir must have a different statementId 
ensured by
+   * {@link org.apache.hadoop.hive.ql.optimizer.QueryPlanPostProcessor}.
+   * {@link org.apache.hadoop.hive.ql.metadata.Hive#moveAcidFiles(FileSystem, 
FileStatus[], Path, List)} drops the union subdirs
+   * since each delta file has a unique name.
    */
   @Test
   public void testCtasTezUnion() throws Exception {
     HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing 
hive conf
+    confForTez.setBoolVar(HiveConf.ConfVars.HIVE_EXPLAIN_USER, false);
     setupTez(confForTez);
     //CTAS with ACID target table
+    List<String> rs0 = runStatementOnDriver("explain create table " + 
Table.ACIDNOBUCKET + " stored as ORC TBLPROPERTIES('transactional'='true') as " 
+
+      "select a, b from " + Table.ACIDTBL + " where a <= 5 union all select a, 
b from " + Table.NONACIDORCTBL + " where a >= 5", confForTez);
+    LOG.warn("explain ctas:");//TezEdgeProperty.EdgeType
+    for (String s : rs0) {
+      LOG.warn(s);
+    }
     runStatementOnDriver("create table " + Table.ACIDNOBUCKET + " stored as 
ORC TBLPROPERTIES('transactional'='true') as " +
       "select a, b from " + Table.ACIDTBL + " where a <= 5 union all select a, 
b from " + Table.NONACIDORCTBL + " where a >= 5", confForTez);
-    List<String> rs = runStatementOnDriver("select ROW__ID, a, b, 
INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID");
+    List<String> rs = runStatementOnDriver("select ROW__ID, a, b, 
INPUT__FILE__NAME from " + Table.ACIDNOBUCKET + " order by ROW__ID", 
confForTez);
     LOG.warn("after ctas:");
     for (String s : rs) {
       LOG.warn(s);
     }
     Assert.assertEquals(0, 
BucketCodec.determineVersion(536870912).decodeWriterId(536870912));
     /*
-    * Expected result 0th entry i the RecordIdentifier + data.  1st entry file 
before compact*/
+    * Expected result 0th entry is the RecordIdentifier + data.  1st entry 
file before compact*/
     String expected[][] = {
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", 
"/1/000000_0"},
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", 
"/1/000000_0"},
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", 
"/1/000000_0"},
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", 
"/2/000000_0"},
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t7\t8", 
"/2/000000_0"},
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":5}\t5\t6", 
"/2/000000_0"},
+      {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":0}\t1\t2", 
"/delta_0000018_0000018_0001/bucket_00000"},
+      {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":1}\t3\t4", 
"/delta_0000018_0000018_0001/bucket_00000"},
+      {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":2}\t5\t6", 
"/delta_0000018_0000018_0001/bucket_00000"},
+      {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":0}\t9\t10", 
"/delta_0000018_0000018_0002/bucket_00000"},
+      {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":1}\t7\t8", 
"/delta_0000018_0000018_0002/bucket_00000"},
+      {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":2}\t5\t6", 
"/delta_0000018_0000018_0002/bucket_00000"},
     };
     Assert.assertEquals("Unexpected row count after ctas", expected.length, 
rs.size());
     //verify data and layout
@@ -441,18 +454,18 @@ public class TestAcidOnTez {
       Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), 
rs.get(i).endsWith(expected[i][1]));
     }
     //perform some Update/Delete
-    runStatementOnDriver("update " + Table.ACIDNOBUCKET + " set a = 70, b  = 
80 where a = 7");
-    runStatementOnDriver("delete from " + Table.ACIDNOBUCKET + " where a = 5");
-    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " 
+ Table.ACIDNOBUCKET + " order by ROW__ID");
+    runStatementOnDriver("update " + Table.ACIDNOBUCKET + " set a = 70, b  = 
80 where a = 7", confForTez);
+    runStatementOnDriver("delete from " + Table.ACIDNOBUCKET + " where a = 5", 
confForTez);
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " 
+ Table.ACIDNOBUCKET + " order by ROW__ID", confForTez);
     LOG.warn("after update/delete:");
     for (String s : rs) {
       LOG.warn(s);
     }
     String[][] expected2 = {
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", 
"/1/000000_0"},
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", 
"/1/000000_0"},
-      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", 
"/2/000000_0"},
-      {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t70\t80", 
"delta_0000019_0000019_0000/bucket_00000"}
+      {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":0}\t1\t2", 
"/delta_0000018_0000018_0001/bucket_00000"},
+      {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":1}\t3\t4", 
"/delta_0000018_0000018_0001/bucket_00000"},
+      {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":0}\t9\t10", 
"/delta_0000018_0000018_0002/bucket_00000"},
+      {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t70\t80", 
"delta_0000020_0000020_0000/bucket_00000"}
     };
     Assert.assertEquals("Unexpected row count after update", expected2.length, 
rs.size());
     //verify data and layout
@@ -464,7 +477,7 @@ public class TestAcidOnTez {
     FileSystem fs = FileSystem.get(hiveConf);
     FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
       (Table.ACIDNOBUCKET).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
-    String[] expectedDelDelta = {"delete_delta_0000019_0000019_0000", 
"delete_delta_0000020_0000020_0000"};
+    String[] expectedDelDelta = {"delete_delta_0000020_0000020_0000", 
"delete_delta_0000021_0000021_0000"};
     for(FileStatus stat : status) {
       for(int i = 0; i < expectedDelDelta.length; i++) {
         if(expectedDelDelta[i] != null && 
stat.getPath().toString().endsWith(expectedDelDelta[i])) {
@@ -476,9 +489,9 @@ public class TestAcidOnTez {
       Assert.assertNull("at " + i + " " + expectedDelDelta[i] + " not found on 
disk", expectedDelDelta[i]);
     }
     //run Minor compaction
-    runStatementOnDriver("alter table " + Table.ACIDNOBUCKET + " compact 
'minor'");
+    runStatementOnDriver("alter table " + Table.ACIDNOBUCKET + " compact 
'minor'", confForTez);
     TestTxnCommands2.runWorker(hiveConf);
-    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " 
+ Table.ACIDNOBUCKET + " order by ROW__ID");
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " 
+ Table.ACIDNOBUCKET + " order by ROW__ID", confForTez);
     LOG.warn("after compact minor:");
     for (String s : rs) {
       LOG.warn(s);
@@ -493,7 +506,7 @@ public class TestAcidOnTez {
     //check we have right delete delta files after minor compaction
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
       (Table.ACIDNOBUCKET).toString().toLowerCase()), 
FileUtils.STAGING_DIR_PATH_FILTER);
-    String[] expectedDelDelta2 = { "delete_delta_0000019_0000019_0000", 
"delete_delta_0000020_0000020_0000", "delete_delta_0000019_0000020"};
+    String[] expectedDelDelta2 = { "delete_delta_0000020_0000020_0000", 
"delete_delta_0000021_0000021_0000", "delete_delta_0000018_0000021"};
     for(FileStatus stat : status) {
       for(int i = 0; i < expectedDelDelta2.length; i++) {
         if(expectedDelDelta2[i] != null && 
stat.getPath().toString().endsWith(expectedDelDelta2[i])) {
@@ -506,9 +519,9 @@ public class TestAcidOnTez {
       Assert.assertNull("at " + i + " " + expectedDelDelta2[i] + " not found 
on disk", expectedDelDelta2[i]);
     }
     //run Major compaction
-    runStatementOnDriver("alter table " + Table.ACIDNOBUCKET + " compact 
'major'");
+    runStatementOnDriver("alter table " + Table.ACIDNOBUCKET + " compact 
'major'", confForTez);
     TestTxnCommands2.runWorker(hiveConf);
-    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " 
+ Table.ACIDNOBUCKET + " order by ROW__ID");
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " 
+ Table.ACIDNOBUCKET + " order by ROW__ID", confForTez);
     LOG.warn("after compact major:");
     for (String s : rs) {
       LOG.warn(s);
@@ -517,7 +530,178 @@ public class TestAcidOnTez {
     for(int i = 0; i < expected2.length; i++) {
       Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), 
rs.get(i).startsWith(expected2[i][0]));
       //everything is now in base/
-      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), 
rs.get(i).endsWith("base_0000020/bucket_00000"));
+      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), 
rs.get(i).endsWith("base_0000021/bucket_00000"));
+    }
+  }
+  /**
+   * 1. Insert into regular unbucketed table from Union all - union is removed 
and data is placed in
+   * subdirs of target table.
+   * 2. convert to acid table and check data
+   * 3. compact and check data
+   * Compare with {@link #testAcidInsertWithRemoveUnion()} where T is 
transactional=true
+   */
+  @Test
+  public void testInsertWithRemoveUnion() throws Exception {
+    int[][] values = {{1,2},{3,4},{5,6},{7,8},{9,10}};
+    HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing 
hive conf
+    setupTez(confForTez);
+    runStatementOnDriver("drop table if exists T", confForTez);
+    runStatementOnDriver("create table T (a int, b int) stored as ORC  
TBLPROPERTIES ('transactional'='false')", confForTez);
+    /*
+ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree  
~/dev/hiverwgit/itests/hive-unit/target/tmp/org.apache.hadoop.hive.ql.TestAcidOnTez-1505502329802/warehouse/t/.hive-staging_hive_2017-09-15_12-07-33_224_7717909516029836949-1/
+/Users/ekoifman/dev/hiverwgit/itests/hive-unit/target/tmp/org.apache.hadoop.hive.ql.TestAcidOnTez-1505502329802/warehouse/t/.hive-staging_hive_2017-09-15_12-07-33_224_7717909516029836949-1/
+└── -ext-10000
+    ├── HIVE_UNION_SUBDIR_1
+    │   └── 000000_0
+    ├── HIVE_UNION_SUBDIR_2
+    │   └── 000000_0
+    └── HIVE_UNION_SUBDIR_3
+        └── 000000_0
+
+4 directories, 3 files
+     */
+    runStatementOnDriver("insert into T(a,b) select a, b from " + 
Table.ACIDTBL + " where a between 1 and 3 group by a, b union all select a, b 
from " + Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " 
+ Table.ACIDTBL + " where a >= 9", confForTez);
+    List<String> rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME 
from T order by a, b, INPUT__FILE__NAME", confForTez);
+    LOG.warn(testName.getMethodName() + ": before converting to acid");
+    for(String s : rs) {
+      LOG.warn(s);
+    }
+    String[][] expected = {
+      {"1\t2","warehouse/t/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + 
"1/000000_0"},
+      {"3\t4","warehouse/t/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + 
"1/000000_0"},
+      {"5\t6","warehouse/t/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + 
"2/000000_0"},
+      {"7\t8","warehouse/t/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + 
"2/000000_0"},
+      {"9\t10","warehouse/t/" + AbstractFileMergeOperator.UNION_SUDBIR_PREFIX 
+ "3/000000_0"}
+    };
+    Assert.assertEquals("Unexpected row count after conversion", 
expected.length, rs.size());
+    for(int i = 0; i < expected.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), 
rs.get(i).startsWith(expected[i][0]));
+      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), 
rs.get(i).endsWith(expected[i][1]));
+    }
+    //make the table ACID
+    runStatementOnDriver("alter table T SET TBLPROPERTIES 
('transactional'='true')", confForTez);
+    rs = runStatementOnDriver("select a,b from T order by a, b", confForTez);
+    Assert.assertEquals("After to Acid conversion", 
TestTxnCommands2.stringifyValues(values), rs);
+
+    //run Major compaction
+    runStatementOnDriver("alter table T compact 'major'", confForTez);
+    TestTxnCommands2.runWorker(hiveConf);
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from T 
order by ROW__ID", confForTez);
+    LOG.warn(testName.getMethodName() + ": after compact major of T:");
+    for (String s : rs) {
+      LOG.warn(s);
+    }
+    String[][] expected2 = {
+       {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", 
"warehouse/t/base_-9223372036854775808/bucket_00000"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", 
"warehouse/t/base_-9223372036854775808/bucket_00000"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6", 
"warehouse/t/base_-9223372036854775808/bucket_00000"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t7\t8", 
"warehouse/t/base_-9223372036854775808/bucket_00000"},
+      {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":4}\t9\t10", 
"warehouse/t/base_-9223372036854775808/bucket_00000"}
+    };
+    Assert.assertEquals("Unexpected row count after major compact", 
expected2.length, rs.size());
+    for(int i = 0; i < expected2.length; i++) {
+      Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), 
rs.get(i).startsWith(expected2[i][0]));
+      Assert.assertTrue("Actual line(file) " + i + " ac: " + rs.get(i), 
rs.get(i).endsWith(expected2[i][1]));
+    }
+  }
+  /**
+   * 1. Insert into unbucketed acid table from Union all - union is removed 
and data is placed in
+   * subdirs of target table.
+   * 2. convert to acid table and check data
+   * 3. compact and check data
+   * Compare with {@link #testInsertWithRemoveUnion()} where T is 
transactional=false
+   */
+  @Test
+  public void testAcidInsertWithRemoveUnion() throws Exception {
+    HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing 
hive conf
+    setupTez(confForTez);
+    int[][] values = {{1,2},{3,4},{5,6},{7,8},{9,10}};
+    runStatementOnDriver("drop table if exists T", confForTez);
+    runStatementOnDriver("create table T (a int, b int) stored as ORC  
TBLPROPERTIES ('transactional'='true')", confForTez);
+    /*On Tez, below (T is transactional), we get the following layout
+ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree  
~/dev/hiverwgit/itests/hive-unit/target/tmp/org.apache.hadoop.hive.ql.TestAcidOnTez-1505500035574/warehouse/t/.hive-staging_hive_2017-09-15_11-28-33_960_9111484239090506828-1/
+/Users/ekoifman/dev/hiverwgit/itests/hive-unit/target/tmp/org.apache.hadoop.hive.ql.TestAcidOnTez-1505500035574/warehouse/t/.hive-staging_hive_2017-09-15_11-28-33_960_9111484239090506828-1/
+└── -ext-10000
+    ├── HIVE_UNION_SUBDIR_1
+    │   └── 000000_0
+    │       ├── _orc_acid_version
+    │       └── delta_0000019_0000019_0001
+    │           └── bucket_00000
+    ├── HIVE_UNION_SUBDIR_2
+    │   └── 000000_0
+    │       ├── _orc_acid_version
+    │       └── delta_0000019_0000019_0002
+    │           └── bucket_00000
+    └── HIVE_UNION_SUBDIR_3
+        └── 000000_0
+            ├── _orc_acid_version
+            └── delta_0000019_0000019_0003
+                └── bucket_00000
+
+10 directories, 6 files     */
+    runStatementOnDriver("insert into T(a,b) select a, b from " + 
Table.ACIDTBL + " where a between 1 and 3 union all select a, b from " + 
Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " + 
Table.ACIDTBL + " where a >= 9", confForTez);
+    List<String> rs = runStatementOnDriver("select ROW__ID, a, b, 
INPUT__FILE__NAME from T order by a, b", confForTez);
+    LOG.warn(testName.getMethodName() + ": reading acid table T");
+    for(String s : rs) {
+      LOG.warn(s);
+    }
+
+    String[][] expected2 = {
+      {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":0}\t1\t2", 
"warehouse/t/delta_0000019_0000019_0001/bucket_00000"},
+      {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":1}\t3\t4", 
"warehouse/t/delta_0000019_0000019_0001/bucket_00000"},
+      {"{\"transactionid\":19,\"bucketid\":536870914,\"rowid\":0}\t5\t6", 
"warehouse/t/delta_0000019_0000019_0002/bucket_00000"},
+      {"{\"transactionid\":19,\"bucketid\":536870914,\"rowid\":1}\t7\t8", 
"warehouse/t/delta_0000019_0000019_0002/bucket_00000"},
+      {"{\"transactionid\":19,\"bucketid\":536870915,\"rowid\":0}\t9\t10", 
"warehouse/t/delta_0000019_0000019_0003/bucket_00000"}
+    };
+    Assert.assertEquals("Unexpected row count", expected2.length, rs.size());
+    for(int i = 0; i < expected2.length; i++) {
+      Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), 
rs.get(i).startsWith(expected2[i][0]));
+      Assert.assertTrue("Actual line(file) " + i + " ac: " + rs.get(i), 
rs.get(i).endsWith(expected2[i][1]));
+    }
+  }
+  @Test
+  public void testBucketedAcidInsertWithRemoveUnion() throws Exception {
+    HiveConf confForTez = new HiveConf(hiveConf); // make a clone of existing 
hive conf
+    setupTez(confForTez);
+    int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}};
+    runStatementOnDriver("delete from " + Table.ACIDTBL, confForTez);
+    runStatementOnDriver("insert into " + Table.ACIDTBL + 
TestTxnCommands2.makeValuesClause(values));//make sure both buckets are not 
empty
+    runStatementOnDriver("drop table if exists T", confForTez);
+    /*
+    With bucketed target table Union All is not removed
+
+    ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree  
~/dev/hiverwgit/itests/hive-unit/target/tmp/org.apache.hadoop.hive.ql.TestAcidOnTez-1505510130462/warehouse/t/.hive-staging_hive_2017-09-15_14-16-32_422_4626314315862498838-1/
+/Users/ekoifman/dev/hiverwgit/itests/hive-unit/target/tmp/org.apache.hadoop.hive.ql.TestAcidOnTez-1505510130462/warehouse/t/.hive-staging_hive_2017-09-15_14-16-32_422_4626314315862498838-1/
+└── -ext-10000
+    ├── 000000_0
+    │   ├── _orc_acid_version
+    │   └── delta_0000021_0000021_0000
+    │       └── bucket_00000
+    └── 000001_0
+        ├── _orc_acid_version
+        └── delta_0000021_0000021_0000
+            └── bucket_00001
+
+5 directories, 4 files
+*/
+    runStatementOnDriver("create table T (a int, b int) clustered by (a) into 
2 buckets stored as ORC  TBLPROPERTIES ('transactional'='true')", confForTez);
+    runStatementOnDriver("insert into T(a,b) select a, b from " + 
Table.ACIDTBL + " where a between 1 and 3 union all select a, b from " + 
Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " + 
Table.ACIDTBL + " where a >= 9", confForTez);
+    List<String> rs = runStatementOnDriver("select ROW__ID, a, b, 
INPUT__FILE__NAME from T order by a, b", confForTez);
+    LOG.warn(testName.getMethodName() + ": reading bucketed acid table T");
+    for(String s : rs) {
+      LOG.warn(s);
+    }
+    String[][] expected2 = {
+      {"{\"transactionid\":21,\"bucketid\":536936448,\"rowid\":0}\t1\t2", 
"warehouse/t/delta_0000021_0000021_0000/bucket_00001"},
+      {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t2\t4", 
"warehouse/t/delta_0000021_0000021_0000/bucket_00000"},
+      {"{\"transactionid\":21,\"bucketid\":536936448,\"rowid\":1}\t5\t6", 
"warehouse/t/delta_0000021_0000021_0000/bucket_00001"},
+      {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t6\t8", 
"warehouse/t/delta_0000021_0000021_0000/bucket_00000"},
+      {"{\"transactionid\":21,\"bucketid\":536936448,\"rowid\":2}\t9\t10", 
"warehouse/t/delta_0000021_0000021_0000/bucket_00001"}
+    };
+    Assert.assertEquals("Unexpected row count", expected2.length, rs.size());
+    for(int i = 0; i < expected2.length; i++) {
+      Assert.assertTrue("Actual line " + i + " ac: " + rs.get(i), 
rs.get(i).startsWith(expected2[i][0]));
+      Assert.assertTrue("Actual line(file) " + i + " ac: " + rs.get(i), 
rs.get(i).endsWith(expected2[i][1]));
     }
   }
   // Ideally test like this should be a qfile test. However, the explain 
output from qfile is always
@@ -613,6 +797,7 @@ public class TestAcidOnTez {
   private List<String> runStatementOnDriver(String stmt, HiveConf conf)
       throws Exception {
     Driver driver = new Driver(conf);
+    driver.setMaxRows(10000);
     CommandProcessorResponse cpr = driver.run(stmt);
     if(cpr.getResponseCode() != 0) {
       throw new RuntimeException(stmt + " failed: " + cpr);

http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java 
b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index c66934a..d074701 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -1002,4 +1002,7 @@ public class Context {
   public void setIsUpdateDeleteMerge(boolean isUpdate) {
     this.isUpdateDeleteMerge = isUpdate;
   }
+  public String getExecutionId() {
+    return executionId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java 
b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 1157e00..b35edfb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -1198,7 +1198,11 @@ public class Driver implements CommandProcessor {
       }
       // Set the transaction id in all of the acid file sinks
       if (haveAcidWrite()) {
-        for (FileSinkDesc desc : plan.getAcidSinks()) {
+        List<FileSinkDesc> acidSinks = new ArrayList<>(plan.getAcidSinks());
+        //sorting makes tests easier to write since file names and ROW__IDs 
depend on statementId
+        //so this makes (file name -> data) mapping stable
+        acidSinks.sort((FileSinkDesc fsd1, FileSinkDesc fsd2) -> 
fsd1.getDirName().compareTo(fsd2.getDirName()));
+        for (FileSinkDesc desc : acidSinks) {
           desc.setTransactionId(txnMgr.getCurrentTxnId());
           //it's possible to have > 1 FileSink writing to the same 
table/partition
           //e.g. Merge stmt, multi-insert stmt when mixing DP and SP writes

http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 739ce18..b000745 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -289,10 +289,19 @@ public class MoveTask extends Task<MoveWork> implements 
Serializable {
           Utilities.LOG14535.info("MoveTask not moving LFD " + sourcePath);
         } else {
           Utilities.LOG14535.info("MoveTask moving LFD " + sourcePath + " to " 
+ targetPath);
-          moveFile(sourcePath, targetPath, lfd.getIsDfsDir());
+          if(lfd.getWriteType() == AcidUtils.Operation.INSERT) {
+            //'targetPath' is table root of un-partitioned table/partition
+            //'sourcePath' result of 'select ...' part of CTAS statement
+            assert lfd.getIsDfsDir();
+            FileSystem srcFs = sourcePath.getFileSystem(conf);
+            List<Path> newFiles = new ArrayList<>();
+            Hive.moveAcidFiles(srcFs, srcFs.globStatus(sourcePath), 
targetPath, newFiles);
+          }
+          else {
+            moveFile(sourcePath, targetPath, lfd.getIsDfsDir());
+          }
         }
       }
-
       // Multi-file load is for dynamic partitions when some partitions do not
       // need to merge and they can simply be moved to the target directory.
       // This is also used for MM table conversion.

http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 3de83c6..c2d4612 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -42,6 +42,8 @@ import 
org.apache.hadoop.hive.metastore.TransactionalValidationListener;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -1195,6 +1197,16 @@ public class AcidUtils {
 
     return tableIsTransactional != null && 
tableIsTransactional.equalsIgnoreCase("true");
   }
+  public static boolean isAcidTable(CreateTableDesc table) {
+    if (table == null || table.getTblProps() == null) {
+      return false;
+    }
+    String tableIsTransactional = 
table.getTblProps().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
+    if (tableIsTransactional == null) {
+      tableIsTransactional = 
table.getTblProps().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase());
+    }
+    return tableIsTransactional != null && 
tableIsTransactional.equalsIgnoreCase("true");
+  }
 
   public static boolean isFullAcidTable(Table table) {
     return isAcidTable(table) && 
!MetaStoreUtils.isInsertOnlyTable(table.getParameters());

http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index e51e648..d44d081 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -134,6 +134,7 @@ import 
org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.UniqueConstraintsRequest;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator;
 import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
 import org.apache.hadoop.hive.ql.exec.FunctionTask;
 import org.apache.hadoop.hive.ql.exec.FunctionUtils;
@@ -3612,12 +3613,14 @@ private void constructOneLBLocationMap(FileStatus fSta,
     }
   }
 
-  private static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path 
dst,
+  public static void moveAcidFiles(FileSystem fs, FileStatus[] stats, Path dst,
                                     List<Path> newFiles) throws HiveException {
     // The layout for ACID files is 
table|partname/base|delta|delete_delta/bucket
-    // We will always only be writing delta files.  In the buckets created by 
FileSinkOperator
-    // it will look like bucket/delta|delete_delta/bucket.  So we need to move 
that into
-    // the above structure. For the first mover there will be no delta 
directory,
+    // We will always only be writing delta files ( except IOW which writes 
base_X/ ).
+    // In the buckets created by FileSinkOperator
+    // it will look like original_bucket/delta|delete_delta/bucket
+    // (e.g. .../-ext-10004/000000_0/delta_0000014_0000014_0000/bucket_00000). 
 So we need to
+    // move that into the above structure. For the first mover there will be 
no delta directory,
     // so we can move the whole directory.
     // For everyone else we will need to just move the buckets under the 
existing delta
     // directory.
@@ -3632,6 +3635,36 @@ private void constructOneLBLocationMap(FileStatus fSta,
       FileStatus[] origBucketStats = null;
       try {
         origBucketStats = fs.listStatus(srcPath, 
AcidUtils.originalBucketFilter);
+        if(origBucketStats == null || origBucketStats.length == 0) {
+          /**
+           check if we are dealing with data with non-standard layout. For 
example a write
+           produced by a (optimized) Union All query
+           which looks like
+          └── -ext-10000
+            ├── HIVE_UNION_SUBDIR_1
+            │   └── 000000_0
+            │       ├── _orc_acid_version
+            │       └── delta_0000019_0000019_0001
+            │           └── bucket_00000
+            ├── HIVE_UNION_SUBDIR_2
+            │   └── 000000_0
+            │       ├── _orc_acid_version
+            │       └── delta_0000019_0000019_0002
+            │           └── bucket_00000
+           The assumption is that we either have all data in subdirs or root 
of srcPath
+           but not both.
+           For Union case, we expect delta dirs to have unique names which is 
assured by
+           {@link org.apache.hadoop.hive.ql.optimizer.QueryPlanPostProcessor}
+          */
+          FileStatus[] unionSubdirs = fs.globStatus(new Path(srcPath,
+            AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "[0-9]*"));
+          List<FileStatus> buckets = new ArrayList<>();
+          for(FileStatus unionSubdir : unionSubdirs) {
+            Collections.addAll(buckets,
+              fs.listStatus(unionSubdir.getPath(), 
AcidUtils.originalBucketFilter));
+          }
+          origBucketStats = buckets.toArray(new FileStatus[buckets.size()]);
+        }
       } catch (IOException e) {
         String msg = "Unable to look for bucket files in src path " + 
srcPath.toUri().toString();
         LOG.error(msg);
@@ -3645,7 +3678,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
                 fs, dst, origBucketPath, createdDeltaDirs, newFiles);
         moveAcidFiles(AcidUtils.DELETE_DELTA_PREFIX, 
AcidUtils.deleteEventDeltaDirFilter,
                 fs, dst,origBucketPath, createdDeltaDirs, newFiles);
-        moveAcidFiles(AcidUtils.BASE_PREFIX, AcidUtils.baseFileFilter,
+        moveAcidFiles(AcidUtils.BASE_PREFIX, AcidUtils.baseFileFilter,//for 
Insert Overwrite
                 fs, dst, origBucketPath, createdDeltaDirs, newFiles);
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java
new file mode 100644
index 0000000..b5bc386
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/QueryPlanPostProcessor.java
@@ -0,0 +1,166 @@
+package org.apache.hadoop.hive.ql.optimizer;
+
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.ReplLoadWork;
+import org.apache.hadoop.hive.ql.index.IndexMetadataChangeWork;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.parse.GenTezProcContext;
+import org.apache.hadoop.hive.ql.parse.GenTezWork;
+import org.apache.hadoop.hive.ql.parse.spark.GenSparkWork;
+import org.apache.hadoop.hive.ql.plan.ArchiveWork;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork;
+import org.apache.hadoop.hive.ql.plan.ColumnStatsWork;
+import org.apache.hadoop.hive.ql.plan.ConditionalWork;
+import org.apache.hadoop.hive.ql.plan.CopyWork;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
+import org.apache.hadoop.hive.ql.plan.ExplainSQRewriteWork;
+import org.apache.hadoop.hive.ql.plan.ExplainWork;
+import org.apache.hadoop.hive.ql.plan.FetchWork;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.FunctionWork;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MoveWork;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.StatsNoJobWork;
+import org.apache.hadoop.hive.ql.plan.StatsWork;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Finds Acid FileSinkDesc objects which can be created in the physical 
(disconnected) plan, e.g.
+ * {@link 
org.apache.hadoop.hive.ql.parse.GenTezUtils#removeUnionOperators(GenTezProcContext,
 BaseWork, int)}
+ * so that statementId can be properly assigned to ensure unique ROW__IDs
+ * {@link org.apache.hadoop.hive.ql.optimizer.unionproc.UnionProcFactory} is 
another example where
+ * Union All optimizations create new FileSinkDescS
+ */
+public class QueryPlanPostProcessor {
+  private static final Logger LOG = 
LoggerFactory.getLogger(QueryPlanPostProcessor.class);
+
+  public QueryPlanPostProcessor(List<Task<?>> rootTasks, Set<FileSinkDesc> 
acidSinks, String executionId) {
+    for(Task<?> t : rootTasks) {
+      //Work
+      Object work = t.getWork();
+      if(work instanceof TezWork) {
+        for(BaseWork bw : ((TezWork)work).getAllWorkUnsorted()) {
+          collectFileSinkDescs(bw.getAllLeafOperators(), acidSinks);
+        }
+      }
+      else if(work instanceof BaseWork) {
+        collectFileSinkDescs(((BaseWork)work).getAllLeafOperators(), 
acidSinks);
+      }
+      else if(work instanceof MapredWork) {
+        MapredWork w = (MapredWork)work;
+        if(w.getMapWork() != null) {
+          collectFileSinkDescs(w.getMapWork().getAllLeafOperators(), 
acidSinks);
+        }
+        if(w.getReduceWork() != null) {
+          collectFileSinkDescs(w.getReduceWork().getAllLeafOperators(), 
acidSinks);
+        }
+      }
+      else if(work instanceof SparkWork) {
+        for(BaseWork bw : ((SparkWork)work).getRoots()) {
+          collectFileSinkDescs(bw.getAllLeafOperators(), acidSinks);
+        }
+      }
+      else if(work instanceof MapredLocalWork) {
+        //I don't think this can have any FileSinkOperatorS - more future 
proofing
+        Set<FileSinkOperator> fileSinkOperatorSet = 
OperatorUtils.findOperators(((MapredLocalWork)work).getAliasToWork().values(), 
FileSinkOperator.class);
+        for(FileSinkOperator fsop : fileSinkOperatorSet) {
+          collectFileSinkDescs(fsop, acidSinks);
+        }
+      }
+      else if(work instanceof ExplainWork) {
+        new QueryPlanPostProcessor(((ExplainWork)work).getRootTasks(), 
acidSinks, executionId);
+      }
+      /*
+      ekoifman:~ ekoifman$ cd dev/hiverwgit/ql/src/java/org/apache/
+ekoifman:apache ekoifman$ find . -name *Work.java
+./hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
+./hadoop/hive/ql/exec/repl/ReplDumpWork.java
+./hadoop/hive/ql/exec/repl/ReplStateLogWork.java
+./hadoop/hive/ql/index/IndexMetadataChangeWork.java
+./hadoop/hive/ql/io/merge/MergeFileWork.java - extends MapWork
+./hadoop/hive/ql/io/rcfile/stats/PartialScanWork.java - extends MapWork
+./hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateWork.java - extends MapWork
+./hadoop/hive/ql/parse/GenTezWork.java
+./hadoop/hive/ql/parse/spark/GenSparkWork.java
+./hadoop/hive/ql/plan/ArchiveWork.java
+./hadoop/hive/ql/plan/BaseWork.java
+./hadoop/hive/ql/plan/ColumnStatsUpdateWork.java
+./hadoop/hive/ql/plan/ColumnStatsWork.java
+./hadoop/hive/ql/plan/ConditionalWork.java
+./hadoop/hive/ql/plan/CopyWork.java
+./hadoop/hive/ql/plan/DDLWork.java
+./hadoop/hive/ql/plan/DependencyCollectionWork.java
+./hadoop/hive/ql/plan/ExplainSQRewriteWork.java
+./hadoop/hive/ql/plan/ExplainWork.java
+./hadoop/hive/ql/plan/FetchWork.java
+./hadoop/hive/ql/plan/FunctionWork.java
+./hadoop/hive/ql/plan/MapredLocalWork.java
+./hadoop/hive/ql/plan/MapredWork.java
+./hadoop/hive/ql/plan/MapWork.java - extends BaseWork
+./hadoop/hive/ql/plan/MergeJoinWork.java - extends BaseWork
+./hadoop/hive/ql/plan/MoveWork.java
+./hadoop/hive/ql/plan/ReduceWork.java
+./hadoop/hive/ql/plan/ReplCopyWork.java - extends CopyWork
+./hadoop/hive/ql/plan/SparkWork.java
+./hadoop/hive/ql/plan/StatsNoJobWork.java
+./hadoop/hive/ql/plan/StatsWork.java
+./hadoop/hive/ql/plan/TezWork.java
+./hadoop/hive/ql/plan/UnionWork.java - extends BaseWork
+      */
+      else if(work instanceof ReplLoadWork ||
+        work instanceof ReplStateLogWork ||
+        work instanceof IndexMetadataChangeWork ||
+        work instanceof GenTezWork ||
+        work instanceof GenSparkWork ||
+        work instanceof ArchiveWork ||
+        work instanceof ColumnStatsUpdateWork ||
+        work instanceof ColumnStatsWork ||
+        work instanceof ConditionalWork ||
+        work instanceof CopyWork ||
+        work instanceof DDLWork ||
+        work instanceof DependencyCollectionWork ||
+        work instanceof ExplainSQRewriteWork ||
+        work instanceof FetchWork ||
+        work instanceof FunctionWork ||
+        work instanceof MoveWork ||
+        work instanceof StatsNoJobWork ||
+        work instanceof StatsWork) {
+        LOG.debug("Found " + work.getClass().getName() + " - no 
FileSinkOperation can be present.  executionId=" + executionId);
+      }
+      else {
+        //if here, someone must have added new Work object - should it be 
walked to find FileSinks?
+        throw new IllegalArgumentException("Unexpected Work object: " + 
work.getClass() + " executionId=" + executionId);
+      }
+    }
+  }
+  private void collectFileSinkDescs(Operator<?> leaf, Set<FileSinkDesc> 
acidSinks) {
+    if(leaf instanceof FileSinkOperator) {
+      FileSinkDesc fsd = ((FileSinkOperator) leaf).getConf();
+      if(fsd.getWriteType() != AcidUtils.Operation.NOT_ACID) {
+        if(acidSinks.add(fsd)) {
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("Found Acid Sink: " + fsd.getDirName());
+          }
+        }
+      }
+    }
+  }
+  private void collectFileSinkDescs(Set<Operator<?>> leaves, Set<FileSinkDesc> 
acidSinks) {
+    for(Operator<?> leaf : leaves) {
+      collectFileSinkDescs(leaf, acidSinks);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java
index 3a38a6d..bc26c5e 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Stack;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -220,7 +221,7 @@ public final class UnionProcFactory {
 
         for (Operator<? extends OperatorDesc> parent : parents) {
           FileSinkDesc fileSinkDesc = (FileSinkDesc) 
fileSinkOp.getConf().clone();
-          fileSinkDesc.setDirName(new Path(parentDirName, 
parent.getIdentifier()));
+          fileSinkDesc.setDirName(new Path(parentDirName, 
AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + parent.getIdentifier()));
           fileSinkDesc.setLinkedFileSink(true);
           Utilities.LOG14535.info("Created LinkedFileSink for union " + 
fileSinkDesc.getDirName() + "; parent " + parentDirName);
           parent.setChildOperators(null);

http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index f50b57e..3475c7c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -131,6 +131,7 @@ import 
org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.optimizer.Optimizer;
+import org.apache.hadoop.hive.ql.optimizer.QueryPlanPostProcessor;
 import org.apache.hadoop.hive.ql.optimizer.Transform;
 import org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException;
 import 
org.apache.hadoop.hive.ql.optimizer.calcite.CalciteSemanticException.UnsupportedFeature;
@@ -7003,6 +7004,8 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
         viewDesc.setSchema(new ArrayList<FieldSchema>(field_schemas));
       }
 
+      destTableIsAcid = tblDesc != null && AcidUtils.isAcidTable(tblDesc);
+
       boolean isDestTempFile = true;
       if (!ctx.isMRTmpFileURI(dest_path.toUri().toString())) {
         idToTableNameMap.put(String.valueOf(destTableId), 
dest_path.toUri().toString());
@@ -7013,9 +7016,8 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
       boolean isDfsDir = (dest_type.intValue() == QBMetaData.DEST_DFS_FILE);
       // Create LFD even for MM CTAS - it's a no-op move, but it still seems 
to be used for stats.
-      loadFileWork.add(new LoadFileDesc(tblDesc, viewDesc,
-          queryTmpdir, dest_path, isDfsDir, cols, colTypes, isMmCtas));
-
+      loadFileWork.add(new LoadFileDesc(tblDesc, viewDesc, queryTmpdir, 
dest_path, isDfsDir, cols,
+        colTypes, destTableIsAcid ? Operation.INSERT : Operation.NOT_ACID, 
isMmCtas));
       if (tblDesc == null) {
         if (viewDesc != null) {
           table_desc = PlanUtils.getTableDesc(viewDesc, cols, colTypes);
@@ -7104,7 +7106,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
     FileSinkDesc fileSinkDesc = createFileSinkDesc(dest, table_desc, dest_part,
         dest_path, currentTableId, destTableIsAcid, destTableIsTemporary,
         destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS,
-        canBeMerged, dest_tab, txnId, isMmCtas);
+        canBeMerged, dest_tab, txnId, isMmCtas, dest_type);
     if (isMmCtas) {
       // Add FSD so that the LoadTask compilation could fix up its path to 
avoid the move.
       tableDesc.setWriter(fileSinkDesc);
@@ -7212,7 +7214,8 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
       boolean destTableIsAcid, boolean destTableIsTemporary,
       boolean destTableIsMaterialization, Path queryTmpdir,
       SortBucketRSCtx rsCtx, DynamicPartitionCtx dpCtx, ListBucketingCtx lbCtx,
-      RowSchema fsRS, boolean canBeMerged, Table dest_tab, Long mmWriteId, 
boolean isMmCtas) throws SemanticException {
+      RowSchema fsRS, boolean canBeMerged, Table dest_tab, Long mmWriteId, 
boolean isMmCtas,
+      Integer dest_type) throws SemanticException {
     FileSinkDesc fileSinkDesc = new FileSinkDesc(queryTmpdir, table_desc,
         conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId, 
rsCtx.isMultiFileSpray(),
         canBeMerged, rsCtx.getNumFiles(), rsCtx.getTotalFiles(), 
rsCtx.getPartnCols(), dpCtx,
@@ -7236,10 +7239,22 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
           (deleting(dest) ? AcidUtils.Operation.DELETE : 
AcidUtils.Operation.INSERT);
       fileSinkDesc.setWriteType(wt);
 
-      String destTableFullName = dest_tab.getCompleteName().replace('@', '.');
-      Map<String, ASTNode> iowMap = 
qb.getParseInfo().getInsertOverwriteTables();
-      if (iowMap.containsKey(destTableFullName)) {
-        fileSinkDesc.setInsertOverwrite(true);
+      switch (dest_type) {
+        case QBMetaData.DEST_PARTITION:
+          //fall through
+        case QBMetaData.DEST_TABLE:
+          //INSERT [OVERWRITE] path
+          String destTableFullName = dest_tab.getCompleteName().replace('@', 
'.');
+          Map<String, ASTNode> iowMap = 
qb.getParseInfo().getInsertOverwriteTables();
+          if (iowMap.containsKey(destTableFullName)) {
+            fileSinkDesc.setInsertOverwrite(true);
+          }
+          break;
+        case QBMetaData.DEST_DFS_FILE:
+          //CTAS path
+          break;
+        default:
+          throw new IllegalStateException("Unexpected dest_type=" + dest_tab);
       }
       acidFileSinks.add(fileSinkDesc);
     }
@@ -11555,6 +11570,8 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
       compiler.compile(pCtx, rootTasks, inputs, outputs);
       fetchTask = pCtx.getFetchTask();
     }
+    //find all Acid FileSinkOperatorS
+    QueryPlanPostProcessor qp = new QueryPlanPostProcessor(rootTasks, 
acidFileSinks, ctx.getExecutionId());
     LOG.info("Completed plan generation");
 
     // 10. put accessed columns to readEntity

http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java
index d708df3..45d4fb0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.plan;
 import java.io.Serializable;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 
 /**
@@ -30,12 +31,15 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level;
 public class LoadDesc implements Serializable {
   private static final long serialVersionUID = 1L;
   private Path sourcePath;
+  /**
+   * Need to remember whether this is an acid compliant operation, and if so 
whether it is an
+   * insert, update, or delete.
+   */
+  private final AcidUtils.Operation writeType;
 
-  public LoadDesc() {
-  }
-
-  public LoadDesc(final Path sourcePath) {
+  public LoadDesc(final Path sourcePath, AcidUtils.Operation writeType) {
     this.sourcePath = sourcePath;
+    this.writeType = writeType;
   }
 
   @Explain(displayName = "source", explainLevels = { Level.EXTENDED })
@@ -46,4 +50,14 @@ public class LoadDesc implements Serializable {
   public void setSourcePath(Path sourcePath) {
     this.sourcePath = sourcePath;
   }
+
+  public AcidUtils.Operation getWriteType() {
+    return writeType;
+  }
+
+  @Explain(displayName = "Write Type")
+  public String getWriteTypeString() {
+    //if acid write, add to plan output, else don't bother
+    return getWriteType() == AcidUtils.Operation.NOT_ACID ? null : 
getWriteType().toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
index 6fad710..0032648 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
@@ -21,8 +21,8 @@ package org.apache.hadoop.hive.ql.plan;
 import java.io.Serializable;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.PTFUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 
 /**
  * LoadFileDesc.
@@ -38,11 +38,8 @@ public class LoadFileDesc extends LoadDesc implements 
Serializable {
   private String destinationCreateTable;
   private boolean isMmCtas;
 
-  public LoadFileDesc() {
-  }
-
   public LoadFileDesc(final LoadFileDesc o) {
-    super(o.getSourcePath());
+    super(o.getSourcePath(), o.getWriteType());
 
     this.targetDir = o.targetDir;
     this.isDfsDir = o.isDfsDir;
@@ -54,8 +51,8 @@ public class LoadFileDesc extends LoadDesc implements 
Serializable {
 
   public LoadFileDesc(final CreateTableDesc createTableDesc, final 
CreateViewDesc  createViewDesc,
                       final Path sourcePath, final Path targetDir, final 
boolean isDfsDir,
-                      final String columns, final String columnTypes, boolean 
isMmCtas) {
-    this(sourcePath, targetDir, isDfsDir, columns, columnTypes, isMmCtas);
+                      final String columns, final String columnTypes, 
AcidUtils.Operation writeType, boolean isMmCtas) {
+   this(sourcePath, targetDir, isDfsDir, columns, columnTypes, writeType, 
isMmCtas);
     if (createTableDesc != null && createTableDesc.getDatabaseName() != null
         && createTableDesc.getTableName() != null) {
       destinationCreateTable = (createTableDesc.getTableName().contains(".") ? 
"" : createTableDesc
@@ -68,9 +65,14 @@ public class LoadFileDesc extends LoadDesc implements 
Serializable {
     }
   }
 
-  public LoadFileDesc(final Path sourcePath, final Path targetDir, final 
boolean isDfsDir,
-      final String columns, final String columnTypes, boolean isMmCtas) {
-    super(sourcePath);
+  public LoadFileDesc(final Path sourcePath, final Path targetDir,
+                      final boolean isDfsDir, final String columns, final 
String columnTypes, boolean isMmCtas) {
+    this(sourcePath, targetDir, isDfsDir, columns, columnTypes, 
AcidUtils.Operation.NOT_ACID, isMmCtas);
+  }
+  private LoadFileDesc(final Path sourcePath, final Path targetDir,
+      final boolean isDfsDir, final String columns,
+      final String columnTypes, AcidUtils.Operation writeType, boolean 
isMmCtas) {
+    super(sourcePath, writeType);
     Utilities.LOG14535.info("creating LFD from " + sourcePath + " to " + 
targetDir);
     this.targetDir = targetDir;
     this.isDfsDir = isDfsDir;

http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
index 3201dc9..e893ab5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
@@ -32,17 +32,13 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level;
  * LoadTableDesc.
  *
  */
-public class LoadTableDesc extends org.apache.hadoop.hive.ql.plan.LoadDesc
-    implements Serializable {
+public class LoadTableDesc extends LoadDesc implements Serializable {
   private static final long serialVersionUID = 1L;
   private boolean replace;
   private DynamicPartitionCtx dpCtx;
   private ListBucketingCtx lbCtx;
   private boolean inheritTableSpecs = true; //For partitions, flag controlling 
whether the current
                                             //table specs are to be used
-  // Need to remember whether this is an acid compliant operation, and if so 
whether it is an
-  // insert, update, or delete.
-  private AcidUtils.Operation writeType;
   private Long txnId;
   private int stmtId;
 
@@ -52,13 +48,12 @@ public class LoadTableDesc extends 
org.apache.hadoop.hive.ql.plan.LoadDesc
   private boolean commitMmWriteId = true;
 
   public LoadTableDesc(final LoadTableDesc o) {
-    super(o.getSourcePath());
+    super(o.getSourcePath(), o.getWriteType());
 
     this.replace = o.replace;
     this.dpCtx = o.dpCtx;
     this.lbCtx = o.lbCtx;
     this.inheritTableSpecs = o.inheritTableSpecs;
-    this.writeType = o.writeType;
     this.table = o.table;
     this.partitionSpec = o.partitionSpec;
   }
@@ -67,11 +62,12 @@ public class LoadTableDesc extends 
org.apache.hadoop.hive.ql.plan.LoadDesc
       final org.apache.hadoop.hive.ql.plan.TableDesc table,
       final Map<String, String> partitionSpec,
       final boolean replace,
-      final AcidUtils.Operation writeType, Long txnId) {
-    super(sourcePath);
+      final AcidUtils.Operation writeType,
+      Long txnId) {
+    super(sourcePath, writeType);
     Utilities.LOG14535.info("creating part LTD from " + sourcePath + " to "
         + ((table.getProperties() == null) ? "null" : table.getTableName()));
-    init(table, partitionSpec, replace, writeType, txnId);
+    init(table, partitionSpec, replace, txnId);
   }
 
   /**
@@ -113,13 +109,13 @@ public class LoadTableDesc extends 
org.apache.hadoop.hive.ql.plan.LoadDesc
       final DynamicPartitionCtx dpCtx,
       final AcidUtils.Operation writeType,
       boolean isReplace, Long txnId) {
-    super(sourcePath);
+    super(sourcePath, writeType);
     Utilities.LOG14535.info("creating LTD from " + sourcePath + " to " + 
table.getTableName()/*, new Exception()*/);
     this.dpCtx = dpCtx;
     if (dpCtx != null && dpCtx.getPartSpec() != null && partitionSpec == null) 
{
-      init(table, dpCtx.getPartSpec(), isReplace, writeType, txnId);
+      init(table, dpCtx.getPartSpec(), isReplace, txnId);
     } else {
-      init(table, new LinkedHashMap<String, String>(), isReplace, writeType, 
txnId);
+      init(table, new LinkedHashMap<String, String>(), isReplace, txnId);
     }
   }
 
@@ -127,11 +123,10 @@ public class LoadTableDesc extends 
org.apache.hadoop.hive.ql.plan.LoadDesc
       final org.apache.hadoop.hive.ql.plan.TableDesc table,
       final Map<String, String> partitionSpec,
       final boolean replace,
-      AcidUtils.Operation writeType, Long txnId) {
+      Long txnId) {
     this.table = table;
     this.partitionSpec = partitionSpec;
     this.replace = replace;
-    this.writeType = writeType;
     this.txnId = txnId;
   }
 
@@ -201,10 +196,6 @@ public class LoadTableDesc extends 
org.apache.hadoop.hive.ql.plan.LoadDesc
     this.lbCtx = lbCtx;
   }
 
-  public AcidUtils.Operation getWriteType() {
-    return writeType;
-  }
-
   public Long getTxnId() {
     return txnId;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
index b695f0f..bbed9be 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
@@ -22,12 +22,12 @@ import org.apache.hadoop.hive.conf.HiveConf;
 
 public class TezEdgeProperty {
 
-  public enum EdgeType {
-    SIMPLE_EDGE,
+  public enum EdgeType {//todo: HIVE-15549
+    SIMPLE_EDGE,//SORT_PARTITION_EDGE
     BROADCAST_EDGE,
-    CONTAINS,
-    CUSTOM_EDGE,
-    CUSTOM_SIMPLE_EDGE,
+    CONTAINS,//used for union (all?)
+    CUSTOM_EDGE,//CO_PARTITION_EDGE
+    CUSTOM_SIMPLE_EDGE,//PARTITION_EDGE
     ONE_TO_ONE_EDGE
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/5b3c7979/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
index 1f0c269..880329d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
@@ -5,7 +5,10 @@ import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -20,6 +23,8 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests 
{
     File.separator + TestTxnNoBuckets.class.getCanonicalName()
     + "-" + System.currentTimeMillis()
   ).getPath().replaceAll("\\\\", "/");
+  @Rule
+  public TestName testName = new TestName();
   @Override
   String getTestDataDir() {
     return TEST_DATA_DIR;
@@ -65,17 +70,7 @@ public class TestTxnNoBuckets extends 
TxnCommandsBaseForTests {
     Assert.assertTrue(rs.get(2), 
rs.get(2).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00001"));
     Assert.assertTrue(rs.get(3), 
rs.get(3).startsWith("{\"transactionid\":19,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2\t"));
     Assert.assertTrue(rs.get(3), 
rs.get(3).endsWith("nobuckets/delta_0000019_0000019_0000/bucket_00001"));
-    /*todo: WTF?
-    RS for update seems to spray randomly... is that OK?  maybe as long as all 
resultant files have different names... will they?
-    Assuming we name them based on taskId, we should create bucketX and 
bucketY.
-    we delete events can be written to bucketX file it could be useful for 
filter delete for a split by file name since the insert
-    events seem to be written to a proper bucketX file.  In fact this may 
reduce the number of changes elsewhere like compactor... maybe
-    But this limits the parallelism - what is worse, you don't know what the 
parallelism should be until you have a list of all the
-    input files since bucket count is no longer a metadata property.  Also, 
with late Update split, the file name has already been determined
-    from taskId so the Insert part won't end up matching the bucketX property 
necessarily.
-    With early Update split, the Insert can still be an insert - i.e. go to 
appropriate bucketX.  But deletes will still go wherever (random shuffle)
-    unless you know all the bucketX files to be read - may not be worth the 
trouble.
-    * 2nd: something in FS fails.  ArrayIndexOutOfBoundsException: 1 at 
FileSinkOperator.process(FileSinkOperator.java:779)*/
+
     runStatementOnDriver("update nobuckets set c3 = 17 where c3 in(0,1)");
     rs = runStatementOnDriver("select ROW__ID, c1, c2, c3, INPUT__FILE__NAME 
from nobuckets order by INPUT__FILE__NAME, ROW__ID");
     LOG.warn("after update");
@@ -152,15 +147,6 @@ public class TestTxnNoBuckets extends 
TxnCommandsBaseForTests {
   }
 
   /**
-   * all of these pass but don't do exactly the right thing
-   * files land as if it's not an acid table "warehouse/myctas4/000000_0"
-   * even though in {@link 
org.apache.hadoop.hive.metastore.TransactionalValidationListener} fires
-   * and sees it as transactional table
-   * look for QB.isCTAS() and CreateTableDesc() in SemanticAnalyzer
-   *
-   * On read, these files are treated like non acid to acid conversion
-   *
-   * see HIVE-15899
    * See CTAS tests in TestAcidOnTez
    */
   @Test
@@ -169,30 +155,177 @@ public class TestTxnNoBuckets extends 
TxnCommandsBaseForTests {
     runStatementOnDriver("insert into " + Table.NONACIDORCTBL +  
makeValuesClause(values));
     runStatementOnDriver("create table myctas stored as ORC TBLPROPERTIES 
('transactional" +
       "'='true', 'transactional_properties'='default') as select a, b from " + 
Table.NONACIDORCTBL);
-    List<String> rs = runStatementOnDriver("select * from myctas order by a, 
b");
-    Assert.assertEquals(stringifyValues(values), rs);
+    List<String> rs = runStatementOnDriver("select ROW__ID, a, b, 
INPUT__FILE__NAME from myctas order by ROW__ID");
+    String expected[][] = {
+      {"{\"transactionid\":14,\"bucketid\":536870912,\"rowid\":0}\t3\t4", 
"warehouse/myctas/delta_0000014_0000014_0000/bucket_00000"},
+      {"{\"transactionid\":14,\"bucketid\":536870912,\"rowid\":1}\t1\t2", 
"warehouse/myctas/delta_0000014_0000014_0000/bucket_00000"},
+    };
+    checkExpected(rs, expected, "Unexpected row count after ctas from non acid 
table");
 
     runStatementOnDriver("insert into " + Table.ACIDTBL + 
makeValuesClause(values));
     runStatementOnDriver("create table myctas2 stored as ORC TBLPROPERTIES 
('transactional" +
       "'='true', 'transactional_properties'='default') as select a, b from " + 
Table.ACIDTBL);
-    rs = runStatementOnDriver("select * from myctas2 order by a, b");
-    Assert.assertEquals(stringifyValues(values), rs);
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from 
myctas2 order by ROW__ID");
+    String expected2[][] = {
+      {"{\"transactionid\":17,\"bucketid\":536870912,\"rowid\":0}\t3\t4", 
"warehouse/myctas2/delta_0000017_0000017_0000/bucket_00000"},
+      {"{\"transactionid\":17,\"bucketid\":536870912,\"rowid\":1}\t1\t2", 
"warehouse/myctas2/delta_0000017_0000017_0000/bucket_00000"},
+    };
+    checkExpected(rs, expected2, "Unexpected row count after ctas from acid 
table");
 
     runStatementOnDriver("create table myctas3 stored as ORC TBLPROPERTIES 
('transactional" +
       "'='true', 'transactional_properties'='default') as select a, b from " + 
Table.NONACIDORCTBL +
       " union all select a, b from " + Table.ACIDTBL);
-    rs = runStatementOnDriver("select * from myctas3 order by a, b");
-    Assert.assertEquals(stringifyValues(new int[][] 
{{1,2},{1,2},{3,4},{3,4}}), rs);
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from 
myctas3 order by ROW__ID");
+    String expected3[][] = {
+      {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":0}\t3\t4", 
"warehouse/myctas3/delta_0000019_0000019_0000/bucket_00000"},
+      {"{\"transactionid\":19,\"bucketid\":536870912,\"rowid\":1}\t1\t2", 
"warehouse/myctas3/delta_0000019_0000019_0000/bucket_00000"},
+      {"{\"transactionid\":19,\"bucketid\":536936448,\"rowid\":0}\t3\t4", 
"warehouse/myctas3/delta_0000019_0000019_0000/bucket_00001"},
+      {"{\"transactionid\":19,\"bucketid\":536936448,\"rowid\":1}\t1\t2", 
"warehouse/myctas3/delta_0000019_0000019_0000/bucket_00001"},
+    };
+    checkExpected(rs, expected3, "Unexpected row count after ctas from union 
all query");
 
     runStatementOnDriver("create table myctas4 stored as ORC TBLPROPERTIES 
('transactional" +
       "'='true', 'transactional_properties'='default') as select a, b from " + 
Table.NONACIDORCTBL +
       " union distinct select a, b from " + Table.ACIDTBL);
-    rs = runStatementOnDriver("select * from myctas4 order by a, b");
+    rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from 
myctas4 order by ROW__ID");
+    String expected4[][] = {
+      {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t1\t2", 
"/delta_0000021_0000021_0000/bucket_00000"},
+      {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t3\t4", 
"/delta_0000021_0000021_0000/bucket_00000"},
+    };
+    checkExpected(rs, expected4, "Unexpected row count after ctas from union 
distinct query");
+  }
+  /**
+   * Insert into unbucketed acid table from union all query
+   * Union All is flattend so nested subdirs are created and acid move drops 
them since
+   * delta dirs have unique names
+   */
+  @Test
+  public void testInsertToAcidWithUnionRemove() throws Exception {
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_OPTIMIZE_UNION_REMOVE, true);
+    hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
+    d.close();
+    d = new Driver(hiveConf);
+    int[][] values = {{1,2},{3,4},{5,6},{7,8},{9,10}};
+    runStatementOnDriver("insert into " + 
TxnCommandsBaseForTests.Table.ACIDTBL + makeValuesClause(values));//HIVE-17138: 
this creates 1 delta_0000013_0000013_0000/bucket_00001
+    runStatementOnDriver("drop table if exists T");
+    runStatementOnDriver("create table T (a int, b int) stored as ORC  
TBLPROPERTIES ('transactional'='true')");
+    /*
+    So Union All removal kicks in and we get 3 subdirs in staging.
+ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree 
/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1505516390532/warehouse/t/.hive-staging_hive_2017-09-15_16-05-06_895_1123322677843388168-1/
+└── -ext-10000
+    ├── HIVE_UNION_SUBDIR_19
+    │   └── 000000_0
+    │       ├── _orc_acid_version
+    │       └── delta_0000016_0000016_0001
+    ├── HIVE_UNION_SUBDIR_20
+    │   └── 000000_0
+    │       ├── _orc_acid_version
+    │       └── delta_0000016_0000016_0002
+    └── HIVE_UNION_SUBDIR_21
+        └── 000000_0
+            ├── _orc_acid_version
+            └── delta_0000016_0000016_0003*/
+    runStatementOnDriver("insert into T(a,b) select a, b from " + 
TxnCommandsBaseForTests.Table.ACIDTBL + " where a between 1 and 3 group by a, b 
union all select a, b from " + TxnCommandsBaseForTests.Table.ACIDTBL + " where 
a between 5 and 7 union all select a, b from " + 
TxnCommandsBaseForTests.Table.ACIDTBL + " where a >= 9");
+
+    List<String> rs = runStatementOnDriver("select ROW__ID, a, b, 
INPUT__FILE__NAME from T order by ROW__ID");
+
+    String expected[][] = {
+      {"{\"transactionid\":16,\"bucketid\":536870913,\"rowid\":0}\t1\t2", 
"/delta_0000016_0000016_0001/bucket_00000"},
+      {"{\"transactionid\":16,\"bucketid\":536870913,\"rowid\":1}\t3\t4", 
"/delta_0000016_0000016_0001/bucket_00000"},
+      {"{\"transactionid\":16,\"bucketid\":536870914,\"rowid\":0}\t7\t8", 
"/delta_0000016_0000016_0002/bucket_00000"},
+      {"{\"transactionid\":16,\"bucketid\":536870914,\"rowid\":1}\t5\t6", 
"/delta_0000016_0000016_0002/bucket_00000"},
+      {"{\"transactionid\":16,\"bucketid\":536870915,\"rowid\":0}\t9\t10", 
"/delta_0000016_0000016_0003/bucket_00000"},
+    };
+    checkExpected(rs, expected, "Unexpected row count after ctas");
+  }
+  private void checkExpected(List<String> rs, String[][] expected, String msg) 
{
+    LOG.warn(testName.getMethodName() + ": read data(" + msg + "): ");
+    for(String s : rs) {
+      LOG.warn(s);
+    }
+    Assert.assertEquals( testName.getMethodName() + ": " + msg, 
expected.length, rs.size());
+    //verify data and layout
+    for(int i = 0; i < expected.length; i++) {
+      Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), 
rs.get(i).startsWith(expected[i][0]));
+      Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), 
rs.get(i).endsWith(expected[i][1]));
+    }
+  }
+  /**
+   * The idea here is to create a non acid table that was written by multiple 
writers, i.e.
+   * unbucketed table that has 000000_0 & 000001_0, for example.  
Unfortunately this doesn't work
+   * due to 'merge' logic - see comments in the method
+   */
+  @Ignore
+  @Test
+  public void testToAcidConversionMultiBucket() throws Exception {
+    int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}};
+    runStatementOnDriver("insert into " + Table.ACIDTBL + 
makeValuesClause(values));
+    runStatementOnDriver("drop table if exists T");
+    runStatementOnDriver("create table T (a int, b int) stored as ORC  
TBLPROPERTIES ('transactional'='false')");
+    /*T non-acid + non bucketd - 3 writers are created and then followed by 
merge to create a single output file
+    though how the data from union is split between writers is a mystery
+    (bucketed tables don't do merge)
+   Processing data file 
file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1505317179157/warehouse/t/.hive-staging_hive_2017-09-13_08-40-30_275_8623609103176711840-1/-ext-10000/000000_0
 [length: 515]
+{"a":6,"b":8}
+{"a":9,"b":10}
+{"a":5,"b":6}
+{"a":1,"b":2}
+{"a":2,"b":4}
+________________________________________________________________________________________________________________________
+
+Processing data file 
file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1505317179157/warehouse/t/.hive-staging_hive_2017-09-13_08-40-30_275_8623609103176711840-1/-ext-10003/000000_0
 [length: 242]
+{"a":6,"b":8}
+________________________________________________________________________________________________________________________
+
+Processing data file 
file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1505317179157/warehouse/t/.hive-staging_hive_2017-09-13_08-40-30_275_8623609103176711840-1/-ext-10003/000001_0
 [length: 244]
+{"a":9,"b":10}
+{"a":5,"b":6}
+________________________________________________________________________________________________________________________
+
+Processing data file 
file:/Users/ekoifman/dev/hiverwgit/ql/target/tmp/org.apache.hadoop.hive.ql.TestTxnNoBuckets-1505317179157/warehouse/t/.hive-staging_hive_2017-09-13_08-40-30_275_8623609103176711840-1/-ext-10003/000002_0
 [length: 242]
+{"a":1,"b":2}
+{"a":2,"b":4}
+ */
+    runStatementOnDriver("insert into T(a,b) select a, b from " + 
Table.ACIDTBL + " where a between 1 and 3 group by a, b union all select a, b 
from " + Table.ACIDTBL + " where a between 5 and 7 union all select a, b from " 
+ Table.ACIDTBL + " where a >= 9");
+    List<String> rs = runStatementOnDriver("select a, b, INPUT__FILE__NAME 
from T order by a, b, INPUT__FILE__NAME");
+    LOG.warn("before converting to acid");
+    for(String s : rs) {
+      LOG.warn(s);
+    }
+  }
+  @Test
+  public void testInsertFromUnion() throws Exception {
+    int[][] values = {{1,2},{2,4},{5,6},{6,8},{9,10}};
+    runStatementOnDriver("insert into " + Table.NONACIDNONBUCKET + 
makeValuesClause(values));
+    runStatementOnDriver("drop table if exists T");
+    runStatementOnDriver("create table T (a int, b int) stored as ORC  
TBLPROPERTIES ('transactional'='true')");
+    runStatementOnDriver("insert into T(a,b) select a, b from " + 
Table.NONACIDNONBUCKET + " where a between 1 and 3 group by a, b union all 
select a, b from " + Table.NONACIDNONBUCKET + " where a between 5 and 7 union 
all select a, b from " + Table.NONACIDNONBUCKET + " where a >= 9");
+    List<String> rs = runStatementOnDriver("select ROW__ID, a, b, 
INPUT__FILE__NAME from T order by a, b, INPUT__FILE__NAME");
+    LOG.warn("before converting to acid");
+    for(String s : rs) {
+      LOG.warn(s);
+    }
+    /*
+    The number of writers seems to be based on number of MR jobs for the src 
query.  todo check number of FileSinks
+    
warehouse/t/.hive-staging_hive_2017-09-13_08-59-28_141_6304543600372946004-1/-ext-10000/000000_0/delta_0000016_0000016_0000/bucket_00000
 [length: 648]
+    
{"operation":0,"originalTransaction":16,"bucket":536870912,"rowId":0,"currentTransaction":16,"row":{"_col0":1,"_col1":2}}
+    
{"operation":0,"originalTransaction":16,"bucket":536870912,"rowId":1,"currentTransaction":16,"row":{"_col0":2,"_col1":4}}
+    
________________________________________________________________________________________________________________________
+    
warehouse/t/.hive-staging_hive_2017-09-13_08-59-28_141_6304543600372946004-1/-ext-10000/000001_0/delta_0000016_0000016_0000/bucket_00001
 [length: 658]
+    
{"operation":0,"originalTransaction":16,"bucket":536936448,"rowId":0,"currentTransaction":16,"row":{"_col0":5,"_col1":6}}
+    
{"operation":0,"originalTransaction":16,"bucket":536936448,"rowId":1,"currentTransaction":16,"row":{"_col0":6,"_col1":8}}
+    
{"operation":0,"originalTransaction":16,"bucket":536936448,"rowId":2,"currentTransaction":16,"row":{"_col0":9,"_col1":10}}
+    */
+    rs = runStatementOnDriver("select a, b from T order by a, b");
     Assert.assertEquals(stringifyValues(values), rs);
+    rs = runStatementOnDriver("select ROW__ID from T group by ROW__ID having 
count(*) > 1");
+    if(rs.size() > 0) {
+      Assert.assertEquals("Duplicate ROW__IDs: " + rs.get(0), 0, rs.size());
+    }
   }
   /**
    * see HIVE-16177
-   * See also {@link TestTxnCommands2#testNonAcidToAcidConversion02()}  todo 
need test with > 1 bucket file
+   * See also {@link TestTxnCommands2#testNonAcidToAcidConversion02()}
    */
   @Test
   public void testToAcidConversion02() throws Exception {

Reply via email to