Repository: hive
Updated Branches:
  refs/heads/branch-2.1 320516dd4 -> c6e789f92


HIVE-13369 AcidUtils.getAcidState() is not paying attention toValidTxnList when 
choosing the best base file (Eugene Koifman, reviewed by Owen O'Malley)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c6e789f9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c6e789f9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c6e789f9

Branch: refs/heads/branch-2.1
Commit: c6e789f928761bd66cd0ffae1acef9ac37d49127
Parents: 320516d
Author: Eugene Koifman <ekoif...@hortonworks.com>
Authored: Mon Jul 18 14:45:47 2016 -0700
Committer: Eugene Koifman <ekoif...@hortonworks.com>
Committed: Mon Jul 18 14:45:47 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/ql/ErrorMsg.java     |   2 +
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java |  60 +++++++++--
 .../hive/ql/txn/AcidOpenTxnsCounterService.java |   8 +-
 .../apache/hadoop/hive/ql/io/TestAcidUtils.java | 100 +++++++++++++++++--
 4 files changed, 154 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c6e789f9/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java 
b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 52dadb7..6ed5b13 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -449,6 +449,8 @@ public enum ErrorMsg {
   IMPORT_SEMANTIC_ERROR(10324, "Import Semantic Analyzer Error"),
   INVALID_FK_SYNTAX(10325, "Invalid Foreign Key syntax"),
   INVALID_PK_SYNTAX(10326, "Invalid Primary Key syntax"),
+  ACID_NOT_ENOUGH_HISTORY(10327, "Not enough history available for ({0},{1}).  
" +
+    "Oldest available base: {2}", true),
   //========================== 20000 range starts here 
========================//
   SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
   SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your 
custom script. "

http://git-wip-us.apache.org/repos/asf/hive/blob/c6e789f9/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 25055c2..6b92413 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.io;
 
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -475,7 +476,9 @@ public class AcidUtils {
   /** State class for getChildState; cannot modify 2 things in a method. */
   private static class TxnBase {
     private FileStatus status;
-    private long txn;
+    private long txn = 0;
+    private long oldestBaseTxnId = Long.MAX_VALUE;
+    private Path oldestBase = null;
   }
 
   /**
@@ -571,6 +574,21 @@ public class AcidUtils {
       }
     }
 
+    if(bestBase.oldestBase != null && bestBase.status == null) {
+      /**
+       * If here, it means there was a base_x (> 1 perhaps) but none were 
suitable for given
+       * {@link txnList}.  Note that 'original' files are logically a 
base_Long.MIN_VALUE and thus
+       * cannot have any data for an open txn.  We could check {@link deltas} 
has files to cover
+       * [1,n] w/o gaps but this would almost never happen...*/
+      //todo: this should only care about 'open' tnxs (HIVE-14211)
+      long[] exceptions = txnList.getInvalidTransactions();
+      String minOpenTxn = exceptions != null && exceptions.length > 0 ?
+        Long.toString(exceptions[0]) : "x";
+      throw new IOException(ErrorMsg.ACID_NOT_ENOUGH_HISTORY.format(
+        Long.toString(txnList.getHighWatermark()),
+        minOpenTxn, bestBase.oldestBase.toString()));
+    }
+
     final Path base = bestBase.status == null ? null : 
bestBase.status.getPath();
     LOG.debug("in directory " + directory.toUri().toString() + " base = " + 
base + " deltas = " +
         deltas.size());
@@ -598,7 +616,26 @@ public class AcidUtils {
       }
     };
   }
-
+  /**
+   * We can only use a 'base' if it doesn't have an open txn (from specific 
reader's point of view)
+   * A 'base' with open txn in its range doesn't have 'enough history' info to 
produce a correct
+   * snapshot for this reader.
+   * Note that such base is NOT obsolete.  Obsolete files are those that are 
"covered" by other
+   * files within the snapshot.
+   */
+  private static boolean isValidBase(long baseTxnId, ValidTxnList txnList) {
+    /*This implementation is suboptimal.  It considers open/aborted txns 
invalid while we are only
+    * concerned with 'open' ones.  (Compaction removes any data that belongs 
to aborted txns and
+    * reads skip anything that belongs to aborted txn, thus base_7 is still OK 
if the only exception
+    * is txn 5 which is aborted).  So this implementation can generate false 
positives. (HIVE-14211)
+    * */
+    if(baseTxnId == Long.MIN_VALUE) {
+      //such base is created by 1st compaction in case of non-acid to acid 
table conversion
+      //By definition there are no open txns with id < 1.
+      return true;
+    }
+    return ValidTxnList.RangeResponse.ALL == txnList.isTxnRangeValid(1, 
baseTxnId);
+  }
   private static void getChildState(FileStatus child, HdfsFileStatusWithId 
childWithId,
       ValidTxnList txnList, List<ParsedDelta> working, List<FileStatus> 
originalDirectories,
       List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase 
bestBase, boolean ignoreEmptyFiles) {
@@ -606,13 +643,22 @@ public class AcidUtils {
     String fn = p.getName();
     if (fn.startsWith(BASE_PREFIX) && child.isDir()) {
       long txn = parseBase(p);
+      if(bestBase.oldestBaseTxnId > txn) {
+        //keep track for error reporting
+        bestBase.oldestBase = p;
+        bestBase.oldestBaseTxnId = txn;
+      }
       if (bestBase.status == null) {
-        bestBase.status = child;
-        bestBase.txn = txn;
+        if(isValidBase(txn, txnList)) {
+          bestBase.status = child;
+          bestBase.txn = txn;
+        }
       } else if (bestBase.txn < txn) {
-        obsolete.add(bestBase.status);
-        bestBase.status = child;
-        bestBase.txn = txn;
+        if(isValidBase(txn, txnList)) {
+          obsolete.add(bestBase.status);
+          bestBase.status = child;
+          bestBase.txn = txn;
+        }
       } else {
         obsolete.add(child);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/c6e789f9/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java
index f5eb8a1..08fcff4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidOpenTxnsCounterService.java
@@ -47,6 +47,7 @@ public class AcidOpenTxnsCounterService extends 
HouseKeeperServiceBase {
     return "Count number of open transactions";
   }
   private static final class OpenTxnsCounter implements Runnable {
+    private static volatile long lastLogTime = 0;
     private final TxnStore txnHandler;
     private final AtomicInteger isAliveCounter;
     private OpenTxnsCounter(HiveConf hiveConf, AtomicInteger isAliveCounter) {
@@ -59,7 +60,12 @@ public class AcidOpenTxnsCounterService extends 
HouseKeeperServiceBase {
         long startTime = System.currentTimeMillis();
         txnHandler.countOpenTxns();
         int count = isAliveCounter.incrementAndGet();
-        LOG.info("OpenTxnsCounter ran for " + (System.currentTimeMillis() - 
startTime)/1000 + "seconds.  isAliveCounter=" + count);
+        if(System.currentTimeMillis() - lastLogTime > 60*1000) {
+          //don't flood the logs with too many msgs
+          LOG.info("OpenTxnsCounter ran for " + (System.currentTimeMillis() - 
startTime) / 1000 +
+            "seconds.  isAliveCounter=" + count);
+          lastLogTime = System.currentTimeMillis();
+        }
       }
       catch(Throwable t) {
         LOG.error("Serious error in {}", Thread.currentThread().getName(), ": 
{}" + t.getMessage(), t);

http://git-wip-us.apache.org/repos/asf/hive/blob/c6e789f9/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java 
b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index 5745dee..2ff8f0d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -27,8 +27,10 @@ import 
org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFileSystem;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockPath;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -217,25 +219,107 @@ public class TestAcidUtils {
         new MockFile("mock:/tbl/part1/base_5/bucket_0", 500, new byte[0]),
         new MockFile("mock:/tbl/part1/base_10/bucket_0", 500, new byte[0]),
         new MockFile("mock:/tbl/part1/base_25/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delta_98_100/bucket_0", 500, new 
byte[0]),
         new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delta_120_130/bucket_0", 500, new 
byte[0]),
         new MockFile("mock:/tbl/part1/base_200/bucket_0", 500, new byte[0]));
     Path part = new MockPath(fs, "/tbl/part1");
     AcidUtils.Directory dir =
         AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:"));
-    assertEquals("mock:/tbl/part1/base_200", 
dir.getBaseDirectory().toString());
+    assertEquals("mock:/tbl/part1/base_100", 
dir.getBaseDirectory().toString());
+    assertEquals(1, dir.getCurrentDirectories().size());
+    assertEquals("mock:/tbl/part1/delta_120_130",
+      dir.getCurrentDirectories().get(0).getPath().toString());
     List<FileStatus> obsoletes = dir.getObsolete();
     assertEquals(4, obsoletes.size());
     assertEquals("mock:/tbl/part1/base_10", 
obsoletes.get(0).getPath().toString());
-    assertEquals("mock:/tbl/part1/base_100", 
obsoletes.get(1).getPath().toString());
-    assertEquals("mock:/tbl/part1/base_25", 
obsoletes.get(2).getPath().toString());
-    assertEquals("mock:/tbl/part1/base_5", 
obsoletes.get(3).getPath().toString());
+    assertEquals("mock:/tbl/part1/base_25", 
obsoletes.get(1).getPath().toString());
+    assertEquals("mock:/tbl/part1/base_5", 
obsoletes.get(2).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_98_100", 
obsoletes.get(3).getPath().toString());
     assertEquals(0, dir.getOriginalFiles().size());
-    assertEquals(0, dir.getCurrentDirectories().size());
-    // we should always get the latest base
+
     dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("10:"));
-    assertEquals("mock:/tbl/part1/base_200", 
dir.getBaseDirectory().toString());
-  }
+    assertEquals("mock:/tbl/part1/base_10", dir.getBaseDirectory().toString());
+    assertEquals(0, dir.getCurrentDirectories().size());
+    obsoletes = dir.getObsolete();
+    assertEquals(1, obsoletes.size());
+    assertEquals("mock:/tbl/part1/base_5", 
obsoletes.get(0).getPath().toString());
+    assertEquals(0, dir.getOriginalFiles().size());
+
+    /*Single statemnt txns only: since we don't compact a txn range that 
includes an open txn,
+    the existence of delta_120_130 implies that 121 in the exception list is 
aborted unless
+    delta_120_130 is from streaming ingest in which case 121 can be open
+    (and thus 122-130 are open too)
+    For multi-statment txns, see HIVE-13369*/
+    dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:121"));
+    assertEquals("mock:/tbl/part1/base_100", 
dir.getBaseDirectory().toString());
+    assertEquals(1, dir.getCurrentDirectories().size());
+    assertEquals("mock:/tbl/part1/delta_120_130",
+      dir.getCurrentDirectories().get(0).getPath().toString());
+    obsoletes = dir.getObsolete();
+    assertEquals(4, obsoletes.size());
+    assertEquals("mock:/tbl/part1/base_10", 
obsoletes.get(0).getPath().toString());
+    assertEquals("mock:/tbl/part1/base_25", 
obsoletes.get(1).getPath().toString());
+    assertEquals("mock:/tbl/part1/base_5", 
obsoletes.get(2).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_98_100", 
obsoletes.get(3).getPath().toString());
+
+    boolean gotException = false;
+    try {
+      dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("125:5"));
+    }
+    catch(IOException e) {
+      gotException = true;
+      Assert.assertEquals("Not enough history available for (125,5).  Oldest 
available base: " +
+        "mock:/tbl/part1/base_5", e.getMessage());
+    }
+    Assert.assertTrue("Expected exception", gotException);
+    
+    fs = new MockFileSystem(conf,
+      new MockFile("mock:/tbl/part1/delta_1_10/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delta_12_25/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/base_25/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0]));
+    part = new MockPath(fs, "/tbl/part1");
+    try {
+      gotException = false;
+      dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:7"));
+    }
+    catch(IOException e) {
+      gotException = true;
+      Assert.assertEquals("Not enough history available for (150,7).  Oldest 
available base: " +
+        "mock:/tbl/part1/base_25", e.getMessage());
+    }
+    Assert.assertTrue("Expected exception", gotException);
+
+    fs = new MockFileSystem(conf,
+      new MockFile("mock:/tbl/part1/delta_2_10/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/base_25/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0]));
+    part = new MockPath(fs, "/tbl/part1");
+    try {
+      gotException = false;
+      dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:7"));
+    }
+    catch(IOException e) {
+      gotException = true;
+      Assert.assertEquals("Not enough history available for (150,7).  Oldest 
available base: " +
+        "mock:/tbl/part1/base_25", e.getMessage());
+    }
+    Assert.assertTrue("Expected exception", gotException);
 
+    fs = new MockFileSystem(conf,
+      //non-acid to acid table conversion
+      new MockFile("mock:/tbl/part1/base_" + Long.MIN_VALUE + "/bucket_0", 
500, new byte[0]),
+      new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]),
+      new MockFile("mock:/tbl/part1/base_100/bucket_0", 500, new byte[0]));
+    part = new MockPath(fs, "/tbl/part1");
+    //note that we don't include current txn of the client in exception list 
to read-you-writes
+    dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("1:"));
+    assertEquals("mock:/tbl/part1/base_" + Long.MIN_VALUE, 
dir.getBaseDirectory().toString());
+    assertEquals(1, dir.getCurrentDirectories().size());
+    assertEquals("mock:/tbl/part1/delta_1_1", 
dir.getCurrentDirectories().get(0).getPath().toString());
+    assertEquals(0, dir.getObsolete().size());
+  }
   @Test
   public void testObsoleteOriginals() throws Exception {
     Configuration conf = new Configuration();

Reply via email to