deniskuzZ commented on a change in pull request #1474:
URL: https://github.com/apache/hive/pull/1474#discussion_r484767687



##########
File path: ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
##########
@@ -2315,6 +2386,139 @@ private void 
testConcurrentMergeInsertNoDuplicates(String query, boolean sharedW
     List res = new ArrayList();
     driver.getFetchTask().fetch(res);
     Assert.assertEquals("Duplicate records found", 4, res.size());
+    dropTable(new String[]{"target", "source"});
+  }
+
+  /**
+   * ValidTxnManager.isValidTxnListState can invalidate a snapshot if a 
relevant write transaction was committed
+   * between a query compilation and lock acquisition. When this happens we 
have to recompile the given query,
+   * otherwise we can miss reading partitions created between. The following 
three cases test these scenarios.
+   * @throws Exception ex
+   */
+  @Test
+  public void testMergeInsertDynamicPartitioningSequential() throws Exception {
+    dropTable(new String[]{"target", "source"});
+    conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
+
+    // Create partition c=1
+    driver.run("create table target (a int, b int) partitioned by (c int) 
stored as orc TBLPROPERTIES ('transactional'='true')");
+    driver.run("insert into target values (1,1,1), (2,2,1)");
+    //Create partition c=2
+    driver.run("create table source (a int, b int) partitioned by (c int) 
stored as orc TBLPROPERTIES ('transactional'='true')");
+    driver.run("insert into source values (3,3,2), (4,4,2)");
+
+    // txn 1 inserts data to an old and a new partition
+    driver.run("insert into source values (5,5,2), (6,6,3)");
+
+    // txn 2 inserts into the target table into a new partition ( and a 
duplicate considering the source table)
+    driver.run("insert into target values (3, 3, 2)");
+
+    // txn3 merge
+    driver.run("merge into target t using source s on t.a = s.a " +
+      "when not matched then insert values (s.a, s.b, s.c)");
+    driver.run("select * from target");
+    List res = new ArrayList();
+    driver.getFetchTask().fetch(res);
+    // The merge should see all three partition and not create duplicates
+    Assert.assertEquals("Duplicate records found", 6, res.size());
+    Assert.assertTrue("Partition 3 was skipped", res.contains("6\t6\t3"));
+    dropTable(new String[]{"target", "source"});
+  }
+
+  @Test
+  public void 
testMergeInsertDynamicPartitioningSnapshotInvalidatedWithOldCommit() throws 
Exception {
+    // By creating the driver with the factory, we should have a ReExecDriver
+    IDriver driver3 = DriverFactory.newDriver(conf);
+    Assert.assertTrue("ReExecDriver was expected", driver3 instanceof 
ReExecDriver);

Review comment:
       changed

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/Driver.java
##########
@@ -488,30 +489,40 @@ private void runInternal(String command, boolean 
alreadyCompiled) throws Command
 
       lockAndRespond();
 
+      int retryShapshotCnt = 0;
+      int maxRetrySnapshotCnt = HiveConf.getIntVar(driverContext.getConf(),
+        HiveConf.ConfVars.HIVE_TXN_MAX_RETRYSNAPSHOT_COUNT);
+
       try {
-        if (!driverTxnHandler.isValidTxnListState()) {
-          LOG.info("Compiling after acquiring locks");
+        while (!driverTxnHandler.isValidTxnListState() && ++retryShapshotCnt 
<= maxRetrySnapshotCnt) {
+          LOG.info("Compiling after acquiring locks, attempt #" + 
retryShapshotCnt);
           // Snapshot was outdated when locks were acquired, hence regenerate 
context,
           // txn list and retry
           // TODO: Lock acquisition should be moved before analyze, this is a 
bit hackish.
           // Currently, we acquire a snapshot, we compile the query wrt that 
snapshot,
           // and then, we acquire locks. If snapshot is still valid, we 
continue as usual.
           // But if snapshot is not valid, we recompile the query.
           if (driverContext.isOutdatedTxn()) {
+            LOG.info("Snapshot is outdated, re-initiating transaction ...");
             driverContext.getTxnManager().rollbackTxn();
 
             String userFromUGI = DriverUtils.getUserFromUGI(driverContext);
             driverContext.getTxnManager().openTxn(context, userFromUGI, 
driverContext.getTxnType());
             lockAndRespond();
           }
+
           driverContext.setRetrial(true);
           driverContext.getBackupContext().addSubContext(context);
           
driverContext.getBackupContext().setHiveLocks(context.getHiveLocks());
           context = driverContext.getBackupContext();
+
           driverContext.getConf().set(ValidTxnList.VALID_TXNS_KEY,
             driverContext.getTxnManager().getValidTxns().toString());
+
           if (driverContext.getPlan().hasAcidResourcesInQuery()) {
+            compileInternal(context.getCmd(), true);
             driverTxnHandler.recordValidWriteIds();
+            driverTxnHandler.setWriteIdForAcidFileSinks();
           }
 
           if (!alreadyCompiled) {

Review comment:
       removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to