[ 
https://issues.apache.org/jira/browse/HIVE-21052?focusedWorklogId=495287&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-495287
 ]

ASF GitHub Bot logged work on HIVE-21052:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 05/Oct/20 11:58
            Start Date: 05/Oct/20 11:58
    Worklog Time Spent: 10m 
      Work Description: deniskuzZ commented on a change in pull request #1548:
URL: https://github.com/apache/hive/pull/1548#discussion_r499544579



##########
File path: ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
##########
@@ -2128,6 +2128,395 @@ public void testCleanerForTxnToWriteId() throws 
Exception {
             0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from 
TXN_TO_WRITE_ID"));
   }
 
+  @Test
+    public void testMmTableAbortWithCompaction() throws Exception {
+    // 1. Insert some rows into MM table
+    runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(1,2)");
+    // There should be 1 delta directory
+    int [][] resultData1 =  new int[][] {{1,2}};
+    verifyDeltaDirAndResult(1, Table.MMTBL.toString(), "", resultData1);
+    List<String> r1 = runStatementOnDriver("select count(*) from " + 
Table.MMTBL);
+    Assert.assertEquals("1", r1.get(0));
+
+    // 2. Let a transaction be aborted
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
+    runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(3,4)");
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
+    // There should be 1 delta and 1 base directory. The base one is the 
aborted one.
+    verifyDeltaDirAndResult(2, Table.MMTBL.toString(), "", resultData1);
+
+    r1 = runStatementOnDriver("select count(*) from " + Table.MMTBL);
+    Assert.assertEquals("1", r1.get(0));
+
+    // Verify query result
+    int [][] resultData2 = new int[][] {{1,2}, {5,6}};
+
+    runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(5,6)");
+    verifyDeltaDirAndResult(3, Table.MMTBL.toString(), "", resultData2);
+    r1 = runStatementOnDriver("select count(*) from " + Table.MMTBL);
+    Assert.assertEquals("2", r1.get(0));

Review comment:
       fixed, turned off StatsOptimizer

##########
File path: ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
##########
@@ -2128,6 +2128,395 @@ public void testCleanerForTxnToWriteId() throws 
Exception {
             0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from 
TXN_TO_WRITE_ID"));
   }
 
+  @Test
+    public void testMmTableAbortWithCompaction() throws Exception {
+    // 1. Insert some rows into MM table
+    runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(1,2)");
+    // There should be 1 delta directory
+    int [][] resultData1 =  new int[][] {{1,2}};
+    verifyDeltaDirAndResult(1, Table.MMTBL.toString(), "", resultData1);
+    List<String> r1 = runStatementOnDriver("select count(*) from " + 
Table.MMTBL);
+    Assert.assertEquals("1", r1.get(0));
+
+    // 2. Let a transaction be aborted
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
+    runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(3,4)");
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
+    // There should be 1 delta and 1 base directory. The base one is the 
aborted one.
+    verifyDeltaDirAndResult(2, Table.MMTBL.toString(), "", resultData1);
+
+    r1 = runStatementOnDriver("select count(*) from " + Table.MMTBL);
+    Assert.assertEquals("1", r1.get(0));
+
+    // Verify query result
+    int [][] resultData2 = new int[][] {{1,2}, {5,6}};
+
+    runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(5,6)");
+    verifyDeltaDirAndResult(3, Table.MMTBL.toString(), "", resultData2);
+    r1 = runStatementOnDriver("select count(*) from " + Table.MMTBL);
+    Assert.assertEquals("2", r1.get(0));
+
+    // 4. Perform a MINOR compaction, expectation is it should remove aborted 
base dir
+    runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MINOR'");
+    // The worker should remove the subdir for aborted transaction
+    runWorker(hiveConf);
+    verifyDeltaDirAndResult(2, Table.MMTBL.toString(), "", resultData2);
+    verifyBaseDirAndResult(0, Table.MMTBL.toString(), "", resultData2);
+    // 5. Run Cleaner. Shouldn't impact anything.
+    runCleaner(hiveConf);
+    // 6. Run initiator remove aborted entry from TXNS table
+    runInitiator(hiveConf);
+
+    // Verify query result
+    List<String> rs = runStatementOnDriver("select a,b from " + Table.MMTBL + 
" order by a");
+    Assert.assertEquals(stringifyValues(resultData2), rs);
+
+    int [][] resultData3 = new int[][] {{1,2}, {5,6}, {7,8}};
+    // 7. add few more rows
+    runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(7,8)");
+    // 8. add one more aborted delta
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
+    runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(9,10)");
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
+
+    // 9. Perform a MAJOR compaction, expectation is it should remove aborted 
base dir
+    runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MAJOR'");
+    verifyDeltaDirAndResult(4, Table.MMTBL.toString(), "", resultData3);
+    runWorker(hiveConf);
+    verifyDeltaDirAndResult(3, Table.MMTBL.toString(), "", resultData3);
+    verifyBaseDirAndResult(1, Table.MMTBL.toString(), "", resultData3);
+    runCleaner(hiveConf);
+    verifyDeltaDirAndResult(0, Table.MMTBL.toString(), "", resultData3);
+    verifyBaseDirAndResult(1, Table.MMTBL.toString(), "", resultData3);
+    runInitiator(hiveConf);
+    verifyDeltaDirAndResult(0, Table.MMTBL.toString(), "", resultData3);
+    verifyBaseDirAndResult(1, Table.MMTBL.toString(), "", resultData3);
+
+    // Verify query result
+    rs = runStatementOnDriver("select a,b from " + Table.MMTBL + " order by 
a");
+    Assert.assertEquals(stringifyValues(resultData3), rs);
+  }
+  @Test
+  public void testMmTableAbortWithCompactionNoCleanup() throws Exception {
+    // 1. Insert some rows into MM table
+    runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(1,2)");
+    runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(5,6)");
+    // There should be 1 delta directory
+    int [][] resultData1 =  new int[][] {{1,2}, {5,6}};
+    verifyDeltaDirAndResult(2, Table.MMTBL.toString(), "", resultData1);
+    List<String> r1 = runStatementOnDriver("select count(*) from " + 
Table.MMTBL);
+    Assert.assertEquals("2", r1.get(0));
+
+    // 2. Let a transaction be aborted
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
+    runStatementOnDriver("insert into " + Table.MMTBL + " values(3,4)");
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
+    // There should be 1 delta and 1 base directory. The base one is the 
aborted one.
+    verifyDeltaDirAndResult(3, Table.MMTBL.toString(), "", resultData1);
+    r1 = runStatementOnDriver("select count(*) from " + Table.MMTBL);
+    Assert.assertEquals("2", r1.get(0));
+
+    // 3. Perform a MINOR compaction, expectation is it should remove aborted 
base dir
+    runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MINOR'");
+    // The worker should remove the subdir for aborted transaction
+    runWorker(hiveConf);
+    verifyDeltaDirAndResult(2, Table.MMTBL.toString(), "", resultData1);
+    verifyBaseDirAndResult(0, Table.MMTBL.toString(), "", resultData1);
+    // Verify query result
+    List<String> rs = runStatementOnDriver("select a,b from " + Table.MMTBL + 
" order by a");
+    Assert.assertEquals(stringifyValues(resultData1), rs);
+
+    int [][] resultData3 = new int[][] {{1,2}, {5,6}, {7,8}};
+    // 4. add few more rows
+    runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(7,8)");
+    // 5. add one more aborted delta
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
+    runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(9,10)");
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
+    verifyDeltaDirAndResult(4, Table.MMTBL.toString(), "", resultData3);
+
+    // 6. Perform a MAJOR compaction, expectation is it should remove aborted 
delta dir
+    runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MAJOR'");
+    runWorker(hiveConf);
+    verifyDeltaDirAndResult(3, Table.MMTBL.toString(), "", resultData3);
+    verifyBaseDirAndResult(1, Table.MMTBL.toString(), "", resultData3);
+
+    // 7. Run one more Major compaction this should not have any affect
+    runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MAJOR'");
+    runWorker(hiveConf);
+    verifyDeltaDirAndResult(3, Table.MMTBL.toString(), "", resultData3);
+    verifyBaseDirAndResult(1, Table.MMTBL.toString(), "", resultData3);
+
+    runCleaner(hiveConf);
+
+    // Verify query result
+    rs = runStatementOnDriver("select a,b from " + Table.MMTBL + " order by 
a");
+    Assert.assertEquals(stringifyValues(resultData3), rs);
+  }
+
+  @Test
+  public void testFullACIDAbortWithMinorMajorCompaction() throws Exception {
+    // 1. Insert some rows into acid table
+    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(1,2)");
+    // There should be 1 delta directory
+    int [][] resultData1 =  new int[][] {{1,2}};
+    verifyDeltaDirAndResult(1, Table.ACIDTBL.toString(), "", resultData1);
+    List<String> r1 = runStatementOnDriver("select count(*) from " + 
Table.ACIDTBL);
+    Assert.assertEquals("1", r1.get(0));
+
+    // 2. Let a transaction be aborted
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
+    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(3,4)");
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
+    // There should be 2 delta directories.
+    verifyDeltaDirAndResult(2, Table.ACIDTBL.toString(), "", resultData1);
+
+    r1 = runStatementOnDriver("select count(*) from " + Table.ACIDTBL);
+    Assert.assertEquals("1", r1.get(0));
+
+    // Verify query result
+    int [][] resultData2 = new int[][] {{1,2}, {5,6}};
+    // 3. insert few more rows in acid table
+    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) values(5,6)");
+    verifyDeltaDirAndResult(3, Table.ACIDTBL.toString(), "", resultData2);
+    r1 = runStatementOnDriver("select count(*) from " + Table.ACIDTBL);
+    Assert.assertEquals("2", r1.get(0));

Review comment:
       fixed, turned of StatsOptimizer




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 495287)
    Time Spent: 2h 50m  (was: 2h 40m)

> Make sure transactions get cleaned if they are aborted before addPartitions 
> is called
> -------------------------------------------------------------------------------------
>
>                 Key: HIVE-21052
>                 URL: https://issues.apache.org/jira/browse/HIVE-21052
>             Project: Hive
>          Issue Type: Bug
>          Components: Transactions
>    Affects Versions: 3.0.0, 3.1.1
>            Reporter: Jaume M
>            Assignee: Jaume M
>            Priority: Critical
>              Labels: pull-request-available
>         Attachments: Aborted Txn w_Direct Write.pdf, HIVE-21052.1.patch, 
> HIVE-21052.10.patch, HIVE-21052.11.patch, HIVE-21052.12.patch, 
> HIVE-21052.2.patch, HIVE-21052.3.patch, HIVE-21052.4.patch, 
> HIVE-21052.5.patch, HIVE-21052.6.patch, HIVE-21052.7.patch, 
> HIVE-21052.8.patch, HIVE-21052.9.patch
>
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> If the transaction is aborted between openTxn and addPartitions and data has 
> been written on the table the transaction manager will think it's an empty 
> transaction and no cleaning will be done.
> This is currently an issue in the streaming API and in micromanaged tables. 
> As proposed by [~ekoifman] this can be solved by:
> * Writing an entry with a special marker to TXN_COMPONENTS at openTxn and 
> when addPartitions is called remove this entry from TXN_COMPONENTS and add 
> the corresponding partition entry to TXN_COMPONENTS.
> * If the cleaner finds and entry with a special marker in TXN_COMPONENTS that 
> specifies that a transaction was opened and it was aborted it must generate 
> jobs for the worker for every possible partition available.
> cc [~ewohlstadter]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to