Repository: hive
Updated Branches:
  refs/heads/master b5b637127 -> e8e0396c1


HIVE-20941: Compactor produces a delete_delta_x_y even if there are no input 
delete events (Igor Kryvenko via Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7a3cac29
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7a3cac29
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7a3cac29

Branch: refs/heads/master
Commit: 7a3cac298b882e5a81838ba74074e0633294e479
Parents: b5b6371
Author: Igor Kryvenko <kryvenko7i...@gmail.com>
Authored: Thu Dec 13 15:52:40 2018 -0800
Committer: Eugene Koifman <ekoif...@apache.org>
Committed: Thu Dec 13 15:52:40 2018 -0800

----------------------------------------------------------------------
 .../hive/ql/txn/compactor/TestCompactor.java    | 41 ++--------------
 .../hive/ql/txn/compactor/CompactorMR.java      | 49 ++++++++------------
 .../apache/hadoop/hive/ql/TestTxnCommands2.java | 26 +++++++++++
 .../apache/hadoop/hive/ql/TestTxnCommands3.java | 14 ++----
 4 files changed, 55 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7a3cac29/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index beb36d7..5af047f 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -17,11 +17,11 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -80,7 +80,6 @@ import 
org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hive.common.util.Retry;
-import org.apache.hive.common.util.RetryTestRunner;
 import org.apache.hive.hcatalog.common.HCatUtil;
 import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
 import org.apache.hive.hcatalog.streaming.HiveEndPoint;
@@ -1459,25 +1458,10 @@ public class TestCompactor {
     checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, 
columnNamesProperty, columnTypesProperty,
       0, 1L, 2L, 1);
 
-    // Verify that we have got correct set of delete_deltas.
+    //Assert that we have no delete deltas if there are no input delete events.
     FileStatus[] deleteDeltaStat =
       fs.listStatus(new Path(table.getSd().getLocation()), 
AcidUtils.deleteEventDeltaDirFilter);
-    String[] deleteDeltas = new String[deleteDeltaStat.length];
-    Path minorCompactedDeleteDelta = null;
-    for (int i = 0; i < deleteDeltas.length; i++) {
-      deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
-      if (deleteDeltas[i].equals("delete_delta_0000001_0000002_v0000005")) {
-        minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
-      }
-    }
-    Arrays.sort(deleteDeltas);
-    String[] expectedDeleteDeltas = new 
String[]{"delete_delta_0000001_0000002_v0000005"};
-    if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
-      Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", 
found: " + Arrays.toString(deleteDeltas));
-    }
-    // There should be no rows in the delete_delta because there have been no 
delete events.
-    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, 
columnNamesProperty, columnTypesProperty, 0,
-      0L, 0L, 1);
+    assertEquals(0, deleteDeltaStat.length);
   }
 
   @Test
@@ -1550,25 +1534,10 @@ public class TestCompactor {
     checkExpectedTxnsPresent(null, new Path[]{resultFile}, 
columnNamesProperty, columnTypesProperty,
       0, 1L, 4L, 1);
 
-    // Verify that we have got correct set of delete_deltas also
+    //Assert that we have no delete deltas if there are no input delete events.
     FileStatus[] deleteDeltaStat =
       fs.listStatus(new Path(table.getSd().getLocation()), 
AcidUtils.deleteEventDeltaDirFilter);
-    String[] deleteDeltas = new String[deleteDeltaStat.length];
-    Path minorCompactedDeleteDelta = null;
-    for (int i = 0; i < deleteDeltas.length; i++) {
-      deleteDeltas[i] = deleteDeltaStat[i].getPath().getName();
-      if (deleteDeltas[i].equals("delete_delta_0000001_0000004_v0000009")) {
-        minorCompactedDeleteDelta = deleteDeltaStat[i].getPath();
-      }
-    }
-    Arrays.sort(deleteDeltas);
-    String[] expectedDeleteDeltas = new 
String[]{"delete_delta_0000001_0000004_v0000009"};
-    if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) {
-      Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", 
found: " + Arrays.toString(deleteDeltas));
-    }
-    // There should be no rows in the delete_delta because there have been no 
delete events.
-    checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, 
columnNamesProperty, columnTypesProperty, 0,
-      0L, 0L, 1);
+    assertEquals(0, deleteDeltaStat.length);
 
     if (connection1 != null) {
       connection1.close();

http://git-wip-us.apache.org/repos/asf/hive/blob/7a3cac29/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index c6cb7c5..42ce174 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -962,23 +962,17 @@ public class CompactorMR {
       V value = reader.createValue();
       getWriter(reporter, reader.getObjectInspector(), split.getBucket());
 
-      AcidUtils.AcidOperationalProperties acidOperationalProperties
-          = AcidUtils.getAcidOperationalProperties(jobConf);
-
-      if (!isMajor && acidOperationalProperties.isSplitUpdate()) {
-        // When split-update is enabled for ACID, we initialize a separate 
deleteEventWriter
-        // that is used to write all the delete events (in case of minor 
compaction only). For major
-        // compaction, history is not required to be maintained hence the 
delete events are processed
-        // but not re-written separately.
-        getDeleteEventWriter(reporter, reader.getObjectInspector(), 
split.getBucket());
-      }
+      AcidUtils.AcidOperationalProperties acidOperationalProperties = 
AcidUtils.getAcidOperationalProperties(jobConf);
 
       while (reader.next(identifier, value)) {
         boolean sawDeleteRecord = reader.isDelete(value);
-        if (isMajor && sawDeleteRecord) continue;
-        if (sawDeleteRecord && deleteEventWriter != null) {
-          // When minor compacting, write delete events to a separate file 
when split-update is
-          // turned on.
+        if (isMajor && sawDeleteRecord) {
+          continue;
+        }
+        if (sawDeleteRecord && acidOperationalProperties.isSplitUpdate()) {
+          if (deleteEventWriter == null) {
+            getDeleteEventWriter(reporter, reader.getObjectInspector(), 
split.getBucket());
+          }
           deleteEventWriter.write(value);
           reporter.progress();
         } else {
@@ -1027,7 +1021,7 @@ public class CompactorMR {
             .bucket(bucket)
             .statementId(-1)//setting statementId == -1 makes compacted delta 
files use
             .visibilityTxnId(getCompactorTxnId());
-        //delta_xxxx_yyyy format
+      //delta_xxxx_yyyy format
 
         // Instantiate the underlying output format
         @SuppressWarnings("unchecked")//since there is no way to parametrize 
instance of Class
@@ -1040,28 +1034,25 @@ public class CompactorMR {
 
     private void getDeleteEventWriter(Reporter reporter, ObjectInspector 
inspector,
         int bucket) throws IOException {
-      if (deleteEventWriter == null) {
-        AcidOutputFormat.Options options = new 
AcidOutputFormat.Options(jobConf);
-        options.inspector(inspector)
-          .writingBase(false)
+
+      AcidOutputFormat.Options options = new AcidOutputFormat.Options(jobConf);
+      options.inspector(inspector).writingBase(false)
           .writingDeleteDelta(true)   // this is the option which will make it 
a delete writer
           .isCompressed(jobConf.getBoolean(IS_COMPRESSED, false))
-          .tableProperties(new 
StringableMap(jobConf.get(TABLE_PROPS)).toProperties())
-          .reporter(reporter)
+          .tableProperties(new 
StringableMap(jobConf.get(TABLE_PROPS)).toProperties()).reporter(reporter)
           .minimumWriteId(jobConf.getLong(MIN_TXN, Long.MAX_VALUE))
-          .maximumWriteId(jobConf.getLong(MAX_TXN, Long.MIN_VALUE))
-          .bucket(bucket)
+          .maximumWriteId(jobConf.getLong(MAX_TXN, 
Long.MIN_VALUE)).bucket(bucket)
           .statementId(-1)//setting statementId == -1 makes compacted delta 
files use
-            // delta_xxxx_yyyy format
+          // delta_xxxx_yyyy format
           .visibilityTxnId(getCompactorTxnId());
 
-        // Instantiate the underlying output format
-        @SuppressWarnings("unchecked")//since there is no way to parametrize 
instance of Class
-        AcidOutputFormat<WritableComparable, V> aof =
+      // Instantiate the underlying output format
+      @SuppressWarnings("unchecked")//since there is no way to parametrize 
instance of Class
+          AcidOutputFormat<WritableComparable, V> aof =
           instantiate(AcidOutputFormat.class, 
jobConf.get(OUTPUT_FORMAT_CLASS_NAME));
 
-        deleteEventWriter = aof.getRawRecordWriter(new 
Path(jobConf.get(TMP_LOCATION)), options);
-      }
+      deleteEventWriter = aof.getRawRecordWriter(new 
Path(jobConf.get(TMP_LOCATION)), options);
+
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/7a3cac29/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index dc39f5e..546ff95 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -2249,6 +2249,32 @@ public class TestTxnCommands2 {
   }
 
   /**
+   * This tests that delete_delta_x_y dirs will be not produced during minor 
compaction if no input delete events.
+   * See HIVE-20941.
+   * @throws Exception
+   */
+  @Test
+  public void testDeleteEventsCompaction() throws Exception {
+    int[][] tableData1 = {{1, 2}};
+    int[][] tableData2 = {{2, 3}};
+    int[][] tableData3 = {{3, 4}};
+    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + 
makeValuesClause(tableData1));
+    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + 
makeValuesClause(tableData2));
+    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + 
makeValuesClause(tableData3));
+
+    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
+    txnHandler.compact(new CompactionRequest("default", 
Table.ACIDTBL.name().toLowerCase(), CompactionType.MINOR));
+    runWorker(hiveConf);
+    runCleaner(hiveConf);
+
+    FileSystem fs = FileSystem.get(hiveConf);
+    FileStatus[] fileStatuses = fs.globStatus(new Path(TEST_WAREHOUSE_DIR + 
"/" + Table.ACIDTBL.name().toLowerCase() + "/*"));
+    for(FileStatus fileStatus : fileStatuses) {
+      
Assert.assertFalse(fileStatus.getPath().getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX));
+    }
+  }
+
+  /**
    * takes raw data and turns it into a string as if from Driver.getResults()
    * sorts rows in dictionary order
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/7a3cac29/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
index 7535f84..1138f11 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
@@ -329,12 +329,9 @@ public class TestTxnCommands3 extends 
TxnCommandsBaseForTests {
     txnMgr2 = swapTxnManager(txnMgr1);
     driver2 = swapDrivers(driver1);
     runStatementOnDriver("alter table T compact 'minor'");//T4
-    TestTxnCommands2.runWorker(hiveConf);//makes delta_1_2 & delete_delta_1_2
+    TestTxnCommands2.runWorker(hiveConf);//makes delta_1_2
          /* Now we should have
      target/warehouse/t/
-     ├── delete_delta_0000001_0000002_v0000019
-     │   ├── _orc_acid_version
-     │   └── bucket_00000
      ├── delta_0000001_0000001_0000
      │   ├── _orc_acid_version
      │   └── bucket_00000
@@ -350,7 +347,6 @@ public class TestTxnCommands3 extends 
TxnCommandsBaseForTests {
         FileUtils.HIDDEN_FILES_PATH_FILTER);
 
     String[] expectedList = new String[] {
-        "/t/delete_delta_0000001_0000002_v0000019",
         "/t/delta_0000001_0000002_v0000019",
         "/t/delta_0000001_0000001_0000",
         "/t/delta_0000002_0000002_0000",
@@ -370,8 +366,7 @@ public class TestTxnCommands3 extends 
TxnCommandsBaseForTests {
     txnMgr1 = swapTxnManager(txnMgr2);
     driver1 = swapDrivers(driver2);
     runStatementOnDriver("commit");//commits T3
-    //so now cleaner should be able to delete delta_0000001_0000001_0000
-    // & delta_0000002_0000002_0000
+    //so now cleaner should be able to delete delta_0000002_0000002_0000
 
     //insert a row so that compactor makes a new delta (due to HIVE-20901)
     runStatementOnDriver("insert into T values(2,5)");//makes delta_3_3 in T5
@@ -379,13 +374,12 @@ public class TestTxnCommands3 extends 
TxnCommandsBaseForTests {
     runStatementOnDriver("alter table T compact 'minor'");
     TestTxnCommands2.runWorker(hiveConf);
     /*
-    at this point delete|delta_0000001_0000003_v0000022 are visible to everyone
-    so cleaner removes all files shadowed by them (which is everything in this 
case)
+    at this point delta_0000001_0000003_v0000022 is visible to everyone
+    so cleaner removes all files shadowed by it (which is everything in this 
case)
     */
     TestTxnCommands2.runCleaner(hiveConf);
 
     expectedList = new String[] {
-        "/t/delete_delta_0000001_0000003_v0000022",
         "/t/delta_0000001_0000003_v0000022"
     };
     actualList = fs.listStatus(new Path(warehousePath + "/t"),

Reply via email to