[ 
https://issues.apache.org/jira/browse/HIVE-23040?focusedWorklogId=426093&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-426093
 ]

ASF GitHub Bot logged work on HIVE-23040:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Apr/20 09:46
            Start Date: 22/Apr/20 09:46
    Worklog Time Spent: 10m 
      Work Description: pkumarsinha commented on a change in pull request #977:
URL: https://github.com/apache/hive/pull/977#discussion_r412835543



##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -663,6 +669,454 @@ public void testMultiDBTxn() throws Throwable {
     }
   }
 
+  @Test
+  public void testIncrementalDumpCheckpointing() throws Throwable {
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+            .run("CREATE TABLE t2(a string) STORED AS TEXTFILE")
+            .dump(primaryDbName);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[] {});
+
+
+    //Case 1: When the last dump finished all the events and
+    //only  _finished_dump file at the hiveDumpRoot was about to be written 
when it failed.
+    ReplDumpWork.testDeletePreviousDumpMetaPath(true);
+
+    WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + 
primaryDbName)
+            .run("insert into t1 values (1)")
+            .run("insert into t2 values (2)")
+            .dump(primaryDbName);
+
+    Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, 
ReplUtils.REPL_HIVE_BASE_DIR);
+    Path ackFile = new Path(hiveDumpDir, 
ReplAck.DUMP_ACKNOWLEDGEMENT.toString());
+    Path ackLastEventID = new Path(hiveDumpDir, 
ReplAck.EVENTS_DUMP.toString());
+    FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf);
+    assertTrue(fs.exists(ackFile));
+    assertTrue(fs.exists(ackLastEventID));
+
+    fs.delete(ackFile, false);
+
+    long firstIncEventID = Long.parseLong(bootstrapDump.lastReplicationId) + 1;
+    long lastIncEventID = Long.parseLong(incrementalDump1.lastReplicationId);
+    assertTrue(lastIncEventID > (firstIncEventID + 1));
+    Map<Path, Long> pathModTimeMap = new HashMap<>();
+    for (long eventId=firstIncEventID; eventId<=lastIncEventID; eventId++) {
+      Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventId));
+      if (fs.exists(eventRoot)) {
+        for (FileStatus fileStatus: fs.listStatus(eventRoot)) {
+          pathModTimeMap.put(fileStatus.getPath(), 
fileStatus.getModificationTime());
+        }
+      }
+    }
+
+    ReplDumpWork.testDeletePreviousDumpMetaPath(false);
+    WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + 
primaryDbName)
+            .dump(primaryDbName);
+    assertEquals(incrementalDump1.dumpLocation, incrementalDump2.dumpLocation);
+    assertTrue(fs.exists(ackFile));
+    //check events were not rewritten.
+    for(Map.Entry<Path, Long> entry :pathModTimeMap.entrySet()) {
+      assertEquals((long)entry.getValue(), fs.getFileStatus(new 
Path(hiveDumpDir, entry.getKey())).getModificationTime());
+    }
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1"})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[] {"2"});
+
+
+    //Case 2: When the last dump was half way through
+    ReplDumpWork.testDeletePreviousDumpMetaPath(true);
+
+    WarehouseInstance.Tuple incrementalDump3 = primary.run("use " + 
primaryDbName)
+            .run("insert into t1 values (3)")
+            .run("insert into t2 values (4)")
+            .dump(primaryDbName);
+
+    hiveDumpDir = new Path(incrementalDump3.dumpLocation, 
ReplUtils.REPL_HIVE_BASE_DIR);
+    ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString());
+    ackLastEventID = new Path(hiveDumpDir, ReplAck.EVENTS_DUMP.toString());
+    fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf);
+    assertTrue(fs.exists(ackFile));
+    assertTrue(fs.exists(ackLastEventID));
+
+    fs.delete(ackFile, false);
+    //delete last three events and test if it recovers.
+    long lastEventID = Long.parseLong(incrementalDump3.lastReplicationId);
+    Path lastEvtRoot = new Path(hiveDumpDir + File.separator + 
String.valueOf(lastEventID));
+    Path secondLastEvtRoot = new Path(hiveDumpDir + File.separator + 
String.valueOf(lastEventID - 1));
+    Path thirdLastEvtRoot = new Path(hiveDumpDir + File.separator + 
String.valueOf(lastEventID - 2));
+    assertTrue(fs.exists(lastEvtRoot));
+    assertTrue(fs.exists(secondLastEvtRoot));
+    assertTrue(fs.exists(thirdLastEvtRoot));
+
+    pathModTimeMap = new HashMap<>();
+    for (long idx = Long.parseLong(incrementalDump2.lastReplicationId)+1; idx 
< (lastEventID - 2); idx ++) {
+      Path eventRoot  = new Path(hiveDumpDir, String.valueOf(idx));
+      if (fs.exists(eventRoot)) {
+        for (FileStatus fileStatus: fs.listStatus(eventRoot)) {
+          pathModTimeMap.put(fileStatus.getPath(), 
fileStatus.getModificationTime());
+        }
+      }
+    }
+    long lastEvtModTimeOld = 
fs.getFileStatus(lastEvtRoot).getModificationTime();
+    long secondLastEvtModTimeOld = 
fs.getFileStatus(secondLastEvtRoot).getModificationTime();
+    long thirdLastEvtModTimeOld = 
fs.getFileStatus(thirdLastEvtRoot).getModificationTime();
+
+    fs.delete(lastEvtRoot, true);
+    fs.delete(secondLastEvtRoot, true);
+    fs.delete(thirdLastEvtRoot, true);
+    List<List<String>> listValues = new ArrayList<>();
+    listValues.add(
+            Arrays.asList(
+                    LAST_EVENT_ID_NAME,
+                    String.valueOf(lastEventID - 3)
+            )
+    );
+    org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(listValues, 
ackLastEventID, primary.hiveConf, true);
+    ReplDumpWork.testDeletePreviousDumpMetaPath(false);
+
+    WarehouseInstance.Tuple incrementalDump4 = primary.run("use " + 
primaryDbName)
+            .dump(primaryDbName);
+
+    assertEquals(incrementalDump3.dumpLocation, incrementalDump4.dumpLocation);
+
+    verifyPathExist(fs, ackFile);
+    verifyPathExist(fs, ackLastEventID);
+    verifyPathExist(fs, lastEvtRoot);
+    verifyPathExist(fs, secondLastEvtRoot);
+    verifyPathExist(fs, thirdLastEvtRoot);
+    assertTrue(fs.getFileStatus(lastEvtRoot).getModificationTime() > 
lastEvtModTimeOld);
+    assertTrue(fs.getFileStatus(secondLastEvtRoot).getModificationTime() > 
secondLastEvtModTimeOld);
+    assertTrue(fs.getFileStatus(thirdLastEvtRoot).getModificationTime() > 
thirdLastEvtModTimeOld);
+
+    //Check other event dump files have not been modified.
+    for (Map.Entry<Path, Long> entry:pathModTimeMap.entrySet()) {
+      assertEquals((long)entry.getValue(), 
fs.getFileStatus(entry.getKey()).getModificationTime());
+    }
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1", "3"})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[] {"2", "4"});
+  }
+
+  @Test
+  public void testIncrementalResumeDump() throws Throwable {
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+            .dump(primaryDbName);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {});
+
+    ReplDumpWork.testDeletePreviousDumpMetaPath(true);
+
+    WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + 
primaryDbName)
+            .run("insert into t1 values (1)")
+            .dump(primaryDbName);
+
+    Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, 
ReplUtils.REPL_HIVE_BASE_DIR);
+    Path ackFile = new Path(hiveDumpDir, 
ReplAck.DUMP_ACKNOWLEDGEMENT.toString());
+    Path ackLastEventID = new Path(hiveDumpDir, 
ReplAck.EVENTS_DUMP.toString());
+    Path dumpMetaData = new Path(hiveDumpDir, "_dumpmetadata");
+
+    FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf);
+    assertTrue(fs.exists(ackFile));
+    assertTrue(fs.exists(ackLastEventID));
+    assertTrue(fs.exists(dumpMetaData));
+
+    fs.delete(ackLastEventID, false);
+    fs.delete(ackFile, false);
+    //delete only last event root dir
+    Path lastEventRoot = new Path(hiveDumpDir, 
String.valueOf(incrementalDump1.lastReplicationId));
+    assertTrue(fs.exists(lastEventRoot));
+    fs.delete(lastEventRoot, true);
+
+    // It should create a fresh dump dir as _events_dump doesn't exist.
+    WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + 
primaryDbName)
+            .dump(primaryDbName);
+    assertTrue(incrementalDump1.dumpLocation != incrementalDump2.dumpLocation);
+    assertTrue(incrementalDump1.lastReplicationId != 
incrementalDump2.lastReplicationId);
+    assertTrue(fs.getFileStatus(new 
Path(incrementalDump2.dumpLocation)).getModificationTime()
+            > fs.getFileStatus(new 
Path(incrementalDump1.dumpLocation)).getModificationTime());
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1"});
+  }
+
+  @Test
+  public void testCheckpointingOnFirstEventDump() throws Throwable {
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+            .dump(primaryDbName);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {});
+
+    // Testing a scenario where first event was getting dumped and it failed.
+    ReplDumpWork.testDeletePreviousDumpMetaPath(true);
+
+    WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + 
primaryDbName)
+            .run("insert into t1 values (1)")
+            .dump(primaryDbName);
+
+    Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, 
ReplUtils.REPL_HIVE_BASE_DIR);
+    Path ackFile = new Path(hiveDumpDir, 
ReplAck.DUMP_ACKNOWLEDGEMENT.toString());
+    Path ackLastEventID = new Path(hiveDumpDir, 
ReplAck.EVENTS_DUMP.toString());
+    Path dumpMetaData = new Path(hiveDumpDir, "_dumpmetadata");
+
+    FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf);
+    assertTrue(fs.exists(ackFile));
+    assertTrue(fs.exists(dumpMetaData));
+
+    fs.delete(ackFile, false);
+    fs.delete(ackLastEventID, false);
+    fs.delete(dumpMetaData, false);
+    //delete all the event folder except first one.
+    long firstIncEventID = Long.parseLong(bootstrapDump.lastReplicationId) + 1;
+    long lastIncEventID = Long.parseLong(incrementalDump1.lastReplicationId);
+    assertTrue(lastIncEventID > (firstIncEventID + 1));
+
+    for (long eventId=firstIncEventID + 1; eventId<=lastIncEventID; eventId++) 
{
+      Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventId));
+      if (fs.exists(eventRoot)) {
+        fs.delete(eventRoot, true);
+      }
+    }
+
+    Path firstIncEventRoot =  new Path(hiveDumpDir, 
String.valueOf(firstIncEventID));
+    long firstIncEventModTimeOld = 
fs.getFileStatus(firstIncEventRoot).getModificationTime();
+    ReplDumpWork.testDeletePreviousDumpMetaPath(false);
+
+    WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + 
primaryDbName)
+            .dump(primaryDbName);
+
+    assertTrue(incrementalDump1.dumpLocation != incrementalDump2.dumpLocation);
+    hiveDumpDir = new Path(incrementalDump2.dumpLocation, 
ReplUtils.REPL_HIVE_BASE_DIR);
+    ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString());
+    firstIncEventRoot =  new Path(hiveDumpDir, 
String.valueOf(firstIncEventID));
+    assertTrue(fs.exists(ackFile));
+    long firstIncEventModTimeNew =  
fs.getFileStatus(firstIncEventRoot).getModificationTime();
+    assertTrue(firstIncEventModTimeNew > firstIncEventModTimeOld);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1"});
+  }
+
+  @Test
+  public void testCheckpointingIncrWithTableDrop() throws Throwable {
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+            .run("insert into t1 values (1)")
+            .run("CREATE TABLE t2(a string) STORED AS TEXTFILE")
+            .run("insert into t2 values (2)")
+            .dump(primaryDbName);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1"})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[] {"2"});
+
+
+    ReplDumpWork.testDeletePreviousDumpMetaPath(true);
+
+    WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + 
primaryDbName)
+            .run("CREATE TABLE t3(a string) STORED AS TEXTFILE")
+            .run("insert into t3 values (3)")
+            .run("insert into t1 values (4)")
+            .run("DROP TABLE t2")
+            .dump(primaryDbName);
+
+    Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, 
ReplUtils.REPL_HIVE_BASE_DIR);
+    Path ackFile = new Path(hiveDumpDir, 
ReplAck.DUMP_ACKNOWLEDGEMENT.toString());
+    Path ackLastEventID = new Path(hiveDumpDir, 
ReplAck.EVENTS_DUMP.toString());
+    Path dumpMetaData = new Path(hiveDumpDir, "_dumpmetadata");
+
+    FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf);
+    assertTrue(fs.exists(ackFile));
+    assertTrue(fs.exists(dumpMetaData));
+
+    fs.delete(ackFile, false);
+    fs.delete(dumpMetaData, false);
+    //delete last five events
+    long fifthLastIncEventID = 
Long.parseLong(incrementalDump1.lastReplicationId) - 4;
+    long lastIncEventID = Long.parseLong(incrementalDump1.lastReplicationId);
+    assertTrue(lastIncEventID > fifthLastIncEventID );
+
+    for (long eventId=fifthLastIncEventID + 1; eventId<=lastIncEventID; 
eventId++) {
+      Path eventRoot = new Path(hiveDumpDir, String.valueOf(eventId));
+      if (fs.exists(eventRoot)) {
+        fs.delete(eventRoot, true);
+      }
+    }
+
+    List<List<String>> listValues = new ArrayList<>();
+    listValues.add(
+            Arrays.asList(
+                    LAST_EVENT_ID_NAME,
+                    String.valueOf(fifthLastIncEventID)
+            )
+    );
+    org.apache.hadoop.hive.ql.parse.repl.dump.Utils.writeOutput(listValues, 
ackLastEventID, primary.hiveConf, true);
+
+    ReplDumpWork.testDeletePreviousDumpMetaPath(false);
+
+    WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + 
primaryDbName)
+            .dump(primaryDbName);
+
+    assertEquals(incrementalDump1.dumpLocation, incrementalDump2.dumpLocation);
+    ackFile = new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString());
+    assertTrue(fs.exists(ackFile));
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1", "4"})
+            .run("select * from " + replicatedDbName + ".t3")
+            .verifyResults(new String[] {"3"})
+            .runFailure("select * from " + replicatedDbName + ".t2");
+  }
+
+  @Test
+  public void testCheckPointingDataDumpFailureBootstrapDuringIncremental() 
throws Throwable {
+    List<String> dumpClause = Arrays.asList(
+            "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + 
"'='1'",
+            "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'",
+            "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + 
"'='0'",
+            "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
+                    + UserGroupInformation.getCurrentUser().getUserName() + 
"'");
+
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("create table t1(a int) clustered by (a) into 2 buckets" +
+                    " stored as orc TBLPROPERTIES ('transactional'='true')")
+            .run("insert into t1 values (1)")
+            .run("insert into t1 values (2)")
+            .dump(primaryDbName, dumpClause);
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1", "2"});
+
+    dumpClause = Arrays.asList(
+            "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + 
"'='1'",
+            "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'",
+            "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + 
"'='0'",
+            "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
+                    + UserGroupInformation.getCurrentUser().getUserName() + 
"'",
+            "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES + "'='true'");
+
+    ReplDumpWork.testDeletePreviousDumpMetaPath(true);
+    WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + 
primaryDbName)
+            .run("create table t2(a int) clustered by (a) into 2 buckets" +
+                    " stored as orc TBLPROPERTIES ('transactional'='true')")
+            .run("insert into t2 values (3)")
+            .run("insert into t2 values (4)")
+            .run("insert into t2 values (5)")
+            .dump(primaryDbName, dumpClause);
+
+    Path hiveDumpDir = new Path(incrementalDump1.dumpLocation, 
ReplUtils.REPL_HIVE_BASE_DIR);
+    Path ackFile = new Path(hiveDumpDir, 
ReplAck.DUMP_ACKNOWLEDGEMENT.toString());
+    Path ackLastEventID = new Path(hiveDumpDir, 
ReplAck.EVENTS_DUMP.toString());
+    Path bootstrapDir = new Path(hiveDumpDir, 
ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME);
+    Path metaDir = new Path(bootstrapDir, EximUtil.METADATA_PATH_NAME);
+
+    Path t2dataDir = new Path(bootstrapDir, EximUtil.DATA_PATH_NAME + 
File.separator
+            + primaryDbName + File.separator + "t2");
+    FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), primary.hiveConf);
+
+    verifyPathExist(fs, ackFile);
+    verifyPathExist(fs, ackLastEventID);
+
+    long oldMetadirModTime = fs.getFileStatus(metaDir).getModificationTime();
+    long oldT2DatadirModTime = 
fs.getFileStatus(t2dataDir).getModificationTime();
+
+    fs.delete(ackFile, false);
+
+    //Do another dump and test the rewrite happended for meta and no write for 
data folder
+    ReplDumpWork.testDeletePreviousDumpMetaPath(false);
+    WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + 
primaryDbName)
+            .dump(primaryDbName, dumpClause);
+    assertEquals(incrementalDump1.dumpLocation, incrementalDump2.dumpLocation);
+    assertTrue(fs.exists(ackFile));
+    verifyPathExist(fs, ackFile);
+    verifyPathExist(fs, ackLastEventID);
+
+    long newMetadirModTime = fs.getFileStatus(metaDir).getModificationTime();
+    long newT2DatadirModTime = 
fs.getFileStatus(t2dataDir).getModificationTime();
+
+    assertTrue(newMetadirModTime > oldMetadirModTime);
+    assertEquals(oldT2DatadirModTime, newT2DatadirModTime);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[] {"3", "4", "5"});
+  }
+
+  @Test
+  public void testHdfsMaxDirItemsLimitDuringIncremental() throws Throwable {
+
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("create table t1(a int) clustered by (a) into 2 buckets" +
+                    " stored as orc TBLPROPERTIES ('transactional'='true')")
+            .run("insert into t1 values (1)")
+            .dump(primaryDbName);
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1"});
+
+    List<String> dumpClause = Arrays.asList("'" + 
ReplUtils.DFS_MAX_DIR_ITEMS_CONFIG + "'='"
+            + (ReplUtils.RESERVED_DIR_ITEMS_COUNT + 5) +"'",
+            "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES + "'='true'");
+
+    WarehouseInstance.Tuple incrementalDump1 = primary.run("use " + 
primaryDbName)
+            .run("insert into t1 values (2)")
+            .run("insert into t1 values (3)")
+            .run("insert into t1 values (4)")
+            .run("insert into t1 values (5)")
+            .run("insert into t1 values (6)")
+            .run("insert into t1 values (7)")
+            .run("insert into t1 values (8)")
+            .run("insert into t1 values (9)")
+            .run("insert into t1 values (10)")
+            .run("create table t2(a int) clustered by (a) into 2 buckets" +
+                    " stored as orc TBLPROPERTIES ('transactional'='true')")
+            .run("insert into t2 values (100)")
+            .dump(primaryDbName, dumpClause);
+
+    int eventCount = Integer.parseInt(incrementalDump1.lastReplicationId)
+            - Integer.parseInt(bootstrapDump.lastReplicationId);
+    assertEquals(eventCount, 5);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1"})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[] {"100"});
+
+    dumpClause = Arrays.asList("'" + ReplUtils.DFS_MAX_DIR_ITEMS_CONFIG + 
"'='1000'");
+
+    WarehouseInstance.Tuple incrementalDump2 = primary.run("use " + 
primaryDbName)
+            .dump(primaryDbName, dumpClause);
+
+    eventCount = Integer.parseInt(incrementalDump2.lastReplicationId)
+            - Integer.parseInt(incrementalDump1.lastReplicationId);
+    assertTrue(eventCount > 5);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1", "2", "3", "4", "5", "6", "7", 
"8", "9", "10"});
+  }
+

Review comment:
       Already have a test which covers deletion of the _event_id . 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 426093)
    Time Spent: 1h 20m  (was: 1h 10m)

> Checkpointing for repl dump incremental phase
> ---------------------------------------------
>
>                 Key: HIVE-23040
>                 URL: https://issues.apache.org/jira/browse/HIVE-23040
>             Project: Hive
>          Issue Type: Improvement
>            Reporter: Aasha Medhi
>            Assignee: PRAVIN KUMAR SINHA
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: HIVE-23040.01.patch, HIVE-23040.02.patch, 
> HIVE-23040.03.patch
>
>          Time Spent: 1h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to