Author: gates
Date: Mon Jan 26 21:21:45 2015
New Revision: 1654899

URL: http://svn.apache.org/r1654899
Log:
HIVE-8966 Delta files created by hive hcatalog streaming cannot be compacted 
(Jihong Liu and Alan Gates, reviewed by Owen O'Malley)

Added:
    
hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
    
hive/trunk/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java
    
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
    
hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
    
hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java
    
hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
    
hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
    
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
    
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
    
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
    
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
    
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
    
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
    
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java

Added: 
hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java?rev=1654899&view=auto
==============================================================================
--- 
hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java 
(added)
+++ 
hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidReadTxnList.java 
Mon Jan 26 21:21:45 2015
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+import java.util.Arrays;
+
+/**
+ * An implmentation of {@link org.apache.hadoop.hive.common.ValidTxnList} for 
use by readers.
+ * This class will view a transaction as valid only if it is committed.  Both 
open and aborted
+ * transactions will be seen as invalid.
+ */
+public class ValidReadTxnList implements ValidTxnList {
+
+  protected long[] exceptions;
+  protected long highWatermark;
+
+  public ValidReadTxnList() {
+    this(new long[0], Long.MAX_VALUE);
+  }
+
+  public ValidReadTxnList(long[] exceptions, long highWatermark) {
+    if (exceptions.length == 0) {
+      this.exceptions = exceptions;
+    } else {
+      this.exceptions = exceptions.clone();
+      Arrays.sort(this.exceptions);
+    }
+    this.highWatermark = highWatermark;
+  }
+
+  public ValidReadTxnList(String value) {
+    readFromString(value);
+  }
+
+  @Override
+  public boolean isTxnValid(long txnid) {
+    if (highWatermark < txnid) {
+      return false;
+    }
+    return Arrays.binarySearch(exceptions, txnid) < 0;
+  }
+
+  @Override
+  public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) {
+    // check the easy cases first
+    if (highWatermark < minTxnId) {
+      return RangeResponse.NONE;
+    } else if (exceptions.length > 0 && exceptions[0] > maxTxnId) {
+      return RangeResponse.ALL;
+    }
+
+    // since the exceptions and the range in question overlap, count the
+    // exceptions in the range
+    long count = Math.max(0, maxTxnId - highWatermark);
+    for(long txn: exceptions) {
+      if (minTxnId <= txn && txn <= maxTxnId) {
+        count += 1;
+      }
+    }
+
+    if (count == 0) {
+      return RangeResponse.ALL;
+    } else if (count == (maxTxnId - minTxnId + 1)) {
+      return RangeResponse.NONE;
+    } else {
+      return RangeResponse.SOME;
+    }
+  }
+
+  @Override
+  public String toString() {
+    return writeToString();
+  }
+
+  @Override
+  public String writeToString() {
+    StringBuilder buf = new StringBuilder();
+    buf.append(highWatermark);
+    if (exceptions.length == 0) {
+      buf.append(':');
+    } else {
+      for(long except: exceptions) {
+        buf.append(':');
+        buf.append(except);
+      }
+    }
+    return buf.toString();
+  }
+
+  @Override
+  public void readFromString(String src) {
+    if (src == null) {
+      highWatermark = Long.MAX_VALUE;
+      exceptions = new long[0];
+    } else {
+      String[] values = src.split(":");
+      highWatermark = Long.parseLong(values[0]);
+      exceptions = new long[values.length - 1];
+      for(int i = 1; i < values.length; ++i) {
+        exceptions[i-1] = Long.parseLong(values[i]);
+      }
+    }
+  }
+
+  @Override
+  public long getHighWatermark() {
+    return highWatermark;
+  }
+
+  @Override
+  public long[] getInvalidTransactions() {
+    return exceptions;
+  }
+}
+

Modified: 
hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java 
(original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/common/ValidTxnList.java 
Mon Jan 26 21:21:45 2015
@@ -38,21 +38,23 @@ public interface ValidTxnList {
   public enum RangeResponse {NONE, SOME, ALL};
 
   /**
-   * Indicates whether a given transaction has been committed and should be
-   * viewed as valid for read.
+   * Indicates whether a given transaction is valid. Note that valid may have 
different meanings
+   * for different implementations, as some will only want to see committed 
transactions and some
+   * both committed and aborted.
    * @param txnid id for the transaction
-   * @return true if committed, false otherwise
+   * @return true if valid, false otherwise
    */
-  public boolean isTxnCommitted(long txnid);
+  public boolean isTxnValid(long txnid);
 
   /**
-   * Find out if a range of transaction ids have been committed.
+   * Find out if a range of transaction ids are valid.  Note that valid may 
have different meanings
+   * for different implementations, as some will only want to see committed 
transactions and some
+   * both committed and aborted.
    * @param minTxnId minimum txnid to look for, inclusive
    * @param maxTxnId maximum txnid to look for, inclusive
-   * @return Indicate whether none, some, or all of these transactions have 
been
-   * committed.
+   * @return Indicate whether none, some, or all of these transactions are 
valid.
    */
-  public RangeResponse isTxnRangeCommitted(long minTxnId, long maxTxnId);
+  public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId);
 
   /**
    * Write this validTxnList into a string. This should produce a string that
@@ -74,9 +76,10 @@ public interface ValidTxnList {
   public long getHighWatermark();
 
   /**
-   * Get the list of transactions under the high water mark that are still
-   * open.
-   * @return a list of open transaction ids
+   * Get the list of transactions under the high water mark that are not 
valid.  Note that invalid
+   * may have different meanings for different implementations, as some will 
only want to see open
+   * transactions and some both open and aborted.
+   * @return a list of invalid transaction ids
    */
-  public long[] getOpenTransactions();
+  public long[] getInvalidTransactions();
 }

Added: 
hive/trunk/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java?rev=1654899&view=auto
==============================================================================
--- 
hive/trunk/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java
 (added)
+++ 
hive/trunk/common/src/test/org/apache/hadoop/hive/common/TestValidReadTxnList.java
 Mon Jan 26 21:21:45 2015
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+
+/**
+ * Tests for {@link ValidReadTxnList}
+ */
+public class TestValidReadTxnList {
+
+  @Test
+  public void noExceptions() throws Exception {
+    ValidTxnList txnList = new ValidReadTxnList(new long[0], 1);
+    String str = txnList.writeToString();
+    Assert.assertEquals("1:", str);
+    ValidTxnList newList = new ValidReadTxnList();
+    newList.readFromString(str);
+    Assert.assertTrue(newList.isTxnValid(1));
+    Assert.assertFalse(newList.isTxnValid(2));
+  }
+
+  @Test
+  public void exceptions() throws Exception {
+    ValidTxnList txnList = new ValidReadTxnList(new long[]{2L,4L}, 5);
+    String str = txnList.writeToString();
+    Assert.assertEquals("5:2:4", str);
+    ValidTxnList newList = new ValidReadTxnList();
+    newList.readFromString(str);
+    Assert.assertTrue(newList.isTxnValid(1));
+    Assert.assertFalse(newList.isTxnValid(2));
+    Assert.assertTrue(newList.isTxnValid(3));
+    Assert.assertFalse(newList.isTxnValid(4));
+    Assert.assertTrue(newList.isTxnValid(5));
+    Assert.assertFalse(newList.isTxnValid(6));
+  }
+
+  @Test
+  public void longEnoughToCompress() throws Exception {
+    long[] exceptions = new long[1000];
+    for (int i = 0; i < 1000; i++) exceptions[i] = i + 100;
+    ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000);
+    String str = txnList.writeToString();
+    ValidTxnList newList = new ValidReadTxnList();
+    newList.readFromString(str);
+    for (int i = 0; i < 100; i++) Assert.assertTrue(newList.isTxnValid(i));
+    for (int i = 100; i < 1100; i++) Assert.assertFalse(newList.isTxnValid(i));
+    for (int i = 1100; i < 2001; i++) Assert.assertTrue(newList.isTxnValid(i));
+    Assert.assertFalse(newList.isTxnValid(2001));
+  }
+
+  @Test
+  public void readWriteConfig() throws Exception {
+    long[] exceptions = new long[1000];
+    for (int i = 0; i < 1000; i++) exceptions[i] = i + 100;
+    ValidTxnList txnList = new ValidReadTxnList(exceptions, 2000);
+    String str = txnList.writeToString();
+    Configuration conf = new Configuration();
+    conf.set(ValidTxnList.VALID_TXNS_KEY, str);
+    File tmpFile = File.createTempFile("TestValidTxnImpl", "readWriteConfig");
+    DataOutputStream out = new DataOutputStream(new FileOutputStream(tmpFile));
+    conf.write(out);
+    out.close();
+    DataInputStream in = new DataInputStream(new FileInputStream(tmpFile));
+    Configuration newConf = new Configuration();
+    newConf.readFields(in);
+    Assert.assertEquals(str, newConf.get(ValidTxnList.VALID_TXNS_KEY));
+  }
+}

Modified: 
hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- 
hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java
 (original)
+++ 
hive/trunk/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/StreamingIntegrationTester.java
 Mon Jan 26 21:21:45 2015
@@ -107,7 +107,7 @@ public class StreamingIntegrationTester
          .hasArgs()
          .withArgName("partition-values")
          .withDescription("partition values, must be provided in order of 
partition columns, " +
-                 "if not provided table is assumed to not be partitioned")
+             "if not provided table is assumed to not be partitioned")
          .withLongOpt("partition")
          .withValueSeparator(',')
          .create('p'));
@@ -264,7 +264,7 @@ public class StreamingIntegrationTester
       this.batches = batches;
       this.writerNumber = writerNumber;
       this.recordsPerTxn = recordsPerTxn;
-      this.frequency = frequency;
+      this.frequency = frequency * 1000;
       this.abortPct = abortPct;
       this.cols = cols;
       this.types = types;
@@ -279,8 +279,8 @@ public class StreamingIntegrationTester
         conn = endPoint.newConnection(true);
         RecordWriter writer = new DelimitedInputWriter(cols, ",", endPoint);
 
-        long start = System.currentTimeMillis();
         for (int i = 0; i < batches; i++) {
+          long start = System.currentTimeMillis();
           LOG.info("Starting batch " + i);
           TransactionBatch batch = conn.fetchTransactionBatch(txnsPerBatch, 
writer);
           try {

Modified: 
hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- 
hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
 (original)
+++ 
hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
 Mon Jan 26 21:21:45 2015
@@ -19,12 +19,8 @@ package org.apache.hadoop.hive.metastore
 
 import junit.framework.Assert;
 import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidTxnListImpl;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.LockComponentBuilder;
-import org.apache.hadoop.hive.metastore.LockRequestBuilder;
 import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
 import org.apache.hadoop.hive.metastore.api.LockResponse;
 import org.apache.hadoop.hive.metastore.api.LockState;
@@ -69,10 +65,10 @@ public class TestHiveMetaStoreTxns {
     client.rollbackTxn(1);
     client.commitTxn(2);
     ValidTxnList validTxns = client.getValidTxns();
-    Assert.assertFalse(validTxns.isTxnCommitted(1));
-    Assert.assertTrue(validTxns.isTxnCommitted(2));
-    Assert.assertFalse(validTxns.isTxnCommitted(3));
-    Assert.assertFalse(validTxns.isTxnCommitted(4));
+    Assert.assertFalse(validTxns.isTxnValid(1));
+    Assert.assertTrue(validTxns.isTxnValid(2));
+    Assert.assertFalse(validTxns.isTxnValid(3));
+    Assert.assertFalse(validTxns.isTxnValid(4));
   }
 
   @Test
@@ -84,17 +80,17 @@ public class TestHiveMetaStoreTxns {
     client.rollbackTxn(1);
     client.commitTxn(2);
     ValidTxnList validTxns = client.getValidTxns(3);
-    Assert.assertFalse(validTxns.isTxnCommitted(1));
-    Assert.assertTrue(validTxns.isTxnCommitted(2));
-    Assert.assertTrue(validTxns.isTxnCommitted(3));
-    Assert.assertFalse(validTxns.isTxnCommitted(4));
+    Assert.assertFalse(validTxns.isTxnValid(1));
+    Assert.assertTrue(validTxns.isTxnValid(2));
+    Assert.assertTrue(validTxns.isTxnValid(3));
+    Assert.assertFalse(validTxns.isTxnValid(4));
   }
 
   @Test
   public void testTxnRange() throws Exception {
     ValidTxnList validTxns = client.getValidTxns();
     Assert.assertEquals(ValidTxnList.RangeResponse.NONE,
-        validTxns.isTxnRangeCommitted(1L, 3L));
+        validTxns.isTxnRangeValid(1L, 3L));
     List<Long> tids = client.openTxns("me", 5).getTxn_ids();
 
     HeartbeatTxnRangeResponse rsp = client.heartbeatTxnRange(1, 5);
@@ -108,43 +104,43 @@ public class TestHiveMetaStoreTxns {
     validTxns = client.getValidTxns();
     System.out.println("validTxns = " + validTxns);
     Assert.assertEquals(ValidTxnList.RangeResponse.ALL,
-        validTxns.isTxnRangeCommitted(2L, 2L));
+        validTxns.isTxnRangeValid(2L, 2L));
     Assert.assertEquals(ValidTxnList.RangeResponse.ALL,
-        validTxns.isTxnRangeCommitted(2L, 3L));
+        validTxns.isTxnRangeValid(2L, 3L));
     Assert.assertEquals(ValidTxnList.RangeResponse.ALL,
-        validTxns.isTxnRangeCommitted(2L, 4L));
+        validTxns.isTxnRangeValid(2L, 4L));
     Assert.assertEquals(ValidTxnList.RangeResponse.ALL,
-        validTxns.isTxnRangeCommitted(3L, 4L));
+        validTxns.isTxnRangeValid(3L, 4L));
 
     Assert.assertEquals(ValidTxnList.RangeResponse.SOME,
-        validTxns.isTxnRangeCommitted(1L, 4L));
+        validTxns.isTxnRangeValid(1L, 4L));
     Assert.assertEquals(ValidTxnList.RangeResponse.SOME,
-        validTxns.isTxnRangeCommitted(2L, 5L));
+        validTxns.isTxnRangeValid(2L, 5L));
     Assert.assertEquals(ValidTxnList.RangeResponse.SOME,
-        validTxns.isTxnRangeCommitted(1L, 2L));
+        validTxns.isTxnRangeValid(1L, 2L));
     Assert.assertEquals(ValidTxnList.RangeResponse.SOME,
-        validTxns.isTxnRangeCommitted(4L, 5L));
+        validTxns.isTxnRangeValid(4L, 5L));
 
     Assert.assertEquals(ValidTxnList.RangeResponse.NONE,
-        validTxns.isTxnRangeCommitted(1L, 1L));
+        validTxns.isTxnRangeValid(1L, 1L));
     Assert.assertEquals(ValidTxnList.RangeResponse.NONE,
-        validTxns.isTxnRangeCommitted(5L, 10L));
+        validTxns.isTxnRangeValid(5L, 10L));
 
-    validTxns = new ValidTxnListImpl("10:4:5:6");
+    validTxns = new ValidReadTxnList("10:4:5:6");
     Assert.assertEquals(ValidTxnList.RangeResponse.NONE,
-        validTxns.isTxnRangeCommitted(4,6));
+        validTxns.isTxnRangeValid(4,6));
     Assert.assertEquals(ValidTxnList.RangeResponse.ALL,
-        validTxns.isTxnRangeCommitted(7, 10));
+        validTxns.isTxnRangeValid(7, 10));
     Assert.assertEquals(ValidTxnList.RangeResponse.SOME,
-        validTxns.isTxnRangeCommitted(7, 11));
+        validTxns.isTxnRangeValid(7, 11));
     Assert.assertEquals(ValidTxnList.RangeResponse.SOME,
-        validTxns.isTxnRangeCommitted(3, 6));
+        validTxns.isTxnRangeValid(3, 6));
     Assert.assertEquals(ValidTxnList.RangeResponse.SOME,
-        validTxns.isTxnRangeCommitted(4, 7));
+        validTxns.isTxnRangeValid(4, 7));
     Assert.assertEquals(ValidTxnList.RangeResponse.SOME,
-        validTxns.isTxnRangeCommitted(1, 12));
+        validTxns.isTxnRangeValid(1, 12));
     Assert.assertEquals(ValidTxnList.RangeResponse.ALL,
-        validTxns.isTxnRangeCommitted(1, 3));
+        validTxns.isTxnRangeValid(1, 3));
   }
 
   @Test
@@ -219,32 +215,32 @@ public class TestHiveMetaStoreTxns {
   @Test
   public void stringifyValidTxns() throws Exception {
     // Test with just high water mark
-    ValidTxnList validTxns = new ValidTxnListImpl("1:");
+    ValidTxnList validTxns = new ValidReadTxnList("1:");
     String asString = validTxns.toString();
     Assert.assertEquals("1:", asString);
-    validTxns = new ValidTxnListImpl(asString);
+    validTxns = new ValidReadTxnList(asString);
     Assert.assertEquals(1, validTxns.getHighWatermark());
-    Assert.assertNotNull(validTxns.getOpenTransactions());
-    Assert.assertEquals(0, validTxns.getOpenTransactions().length);
+    Assert.assertNotNull(validTxns.getInvalidTransactions());
+    Assert.assertEquals(0, validTxns.getInvalidTransactions().length);
     asString = validTxns.toString();
     Assert.assertEquals("1:", asString);
-    validTxns = new ValidTxnListImpl(asString);
+    validTxns = new ValidReadTxnList(asString);
     Assert.assertEquals(1, validTxns.getHighWatermark());
-    Assert.assertNotNull(validTxns.getOpenTransactions());
-    Assert.assertEquals(0, validTxns.getOpenTransactions().length);
+    Assert.assertNotNull(validTxns.getInvalidTransactions());
+    Assert.assertEquals(0, validTxns.getInvalidTransactions().length);
 
     // Test with open transactions
-    validTxns = new ValidTxnListImpl("10:5:3");
+    validTxns = new ValidReadTxnList("10:5:3");
     asString = validTxns.toString();
     if (!asString.equals("10:3:5") && !asString.equals("10:5:3")) {
       Assert.fail("Unexpected string value " + asString);
     }
-    validTxns = new ValidTxnListImpl(asString);
+    validTxns = new ValidReadTxnList(asString);
     Assert.assertEquals(10, validTxns.getHighWatermark());
-    Assert.assertNotNull(validTxns.getOpenTransactions());
-    Assert.assertEquals(2, validTxns.getOpenTransactions().length);
+    Assert.assertNotNull(validTxns.getInvalidTransactions());
+    Assert.assertEquals(2, validTxns.getInvalidTransactions().length);
     boolean sawThree = false, sawFive = false;
-    for (long tid : validTxns.getOpenTransactions()) {
+    for (long tid : validTxns.getInvalidTransactions()) {
       if (tid == 3)  sawThree = true;
       else if (tid == 5) sawFive = true;
       else  Assert.fail("Unexpected value " + tid);

Modified: 
hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- 
hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
 (original)
+++ 
hive/trunk/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
 Mon Jan 26 21:21:45 2015
@@ -1,8 +1,12 @@
 package org.apache.hadoop.hive.ql.txn.compactor;
 
-import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -15,20 +19,28 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
 import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.io.AcidInputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hive.hcatalog.common.HCatUtil;
 import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
 import org.apache.hive.hcatalog.streaming.HiveEndPoint;
 import org.apache.hive.hcatalog.streaming.StreamingConnection;
+import org.apache.hive.hcatalog.streaming.StreamingException;
 import org.apache.hive.hcatalog.streaming.TransactionBatch;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -184,7 +196,7 @@ public class TestCompactor {
     Assert.assertEquals("numNdv a", 1, colAStats.getNumDVs());
     StringColumnStatsData colBStats = 
colStats.get(1).getStatsData().getStringStats();
     Assert.assertEquals("maxColLen b", 3, colBStats.getMaxColLen());
-    Assert.assertEquals("avgColLen b", 3.0, colBStats.getAvgColLen());
+    Assert.assertEquals("avgColLen b", 3.0, colBStats.getAvgColLen(), 0.01);
     Assert.assertEquals("numNulls b", 0, colBStats.getNumNulls());
     Assert.assertEquals("nunDVs", 2, colBStats.getNumDVs());
 
@@ -268,6 +280,299 @@ public class TestCompactor {
       colBStatsPart2, colStats.get(1).getStatsData().getStringStats());
   }
 
+  @Test
+  public void minorCompactWhileStreaming() throws Exception {
+    String dbName = "default";
+    String tblName = "cws";
+    List<String> colNames = Arrays.asList("a", "b");
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+        " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to 
be bucketed
+        " STORED AS ORC", driver);
+
+    HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
+    DelimitedInputWriter writer = new DelimitedInputWriter(new String[] 
{"a","b"},",", endPt);
+    StreamingConnection connection = endPt.newConnection(false);
+    try {
+      // Write a couple of batches
+      for (int i = 0; i < 2; i++) {
+        writeBatch(connection, writer, false);
+      }
+
+      // Start a third batch, but don't close it.
+      writeBatch(connection, writer, true);
+
+      // Now, compact
+      CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+      txnHandler.compact(new CompactionRequest(dbName, tblName, 
CompactionType.MINOR));
+      Worker t = new Worker();
+      t.setThreadId((int) t.getId());
+      t.setHiveConf(conf);
+      AtomicBoolean stop = new AtomicBoolean(true);
+      AtomicBoolean looped = new AtomicBoolean();
+      t.init(stop, looped);
+      t.run();
+
+      // Find the location of the table
+      IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+      Table table = msClient.getTable(dbName, tblName);
+      FileSystem fs = FileSystem.get(conf);
+      FileStatus[] stat =
+          fs.listStatus(new Path(table.getSd().getLocation()), 
AcidUtils.deltaFileFilter);
+      String[] names = new String[stat.length];
+      Path resultFile = null;
+      for (int i = 0; i < names.length; i++) {
+        names[i] = stat[i].getPath().getName();
+        if (names[i].equals("delta_0000001_0000004")) {
+          resultFile = stat[i].getPath();
+        }
+      }
+      Arrays.sort(names);
+      Assert.assertArrayEquals(names, new String[]{"delta_0000001_0000002",
+          "delta_0000001_0000004", "delta_0000003_0000004", 
"delta_0000005_0000006"});
+      checkExpectedTxnsPresent(null, new Path[]{resultFile}, 0, 1L, 4L);
+
+    } finally {
+      connection.close();
+    }
+  }
+
+  @Test
+  public void majorCompactWhileStreaming() throws Exception {
+    String dbName = "default";
+    String tblName = "cws";
+    List<String> colNames = Arrays.asList("a", "b");
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+        " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to 
be bucketed
+        " STORED AS ORC", driver);
+
+    HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
+    DelimitedInputWriter writer = new DelimitedInputWriter(new String[] 
{"a","b"},",", endPt);
+    StreamingConnection connection = endPt.newConnection(false);
+    try {
+      // Write a couple of batches
+      for (int i = 0; i < 2; i++) {
+        writeBatch(connection, writer, false);
+      }
+
+      // Start a third batch, but don't close it.
+      writeBatch(connection, writer, true);
+
+      // Now, compact
+      CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+      txnHandler.compact(new CompactionRequest(dbName, tblName, 
CompactionType.MAJOR));
+      Worker t = new Worker();
+      t.setThreadId((int) t.getId());
+      t.setHiveConf(conf);
+      AtomicBoolean stop = new AtomicBoolean(true);
+      AtomicBoolean looped = new AtomicBoolean();
+      t.init(stop, looped);
+      t.run();
+
+      // Find the location of the table
+      IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+      Table table = msClient.getTable(dbName, tblName);
+      FileSystem fs = FileSystem.get(conf);
+      FileStatus[] stat =
+          fs.listStatus(new Path(table.getSd().getLocation()), 
AcidUtils.baseFileFilter);
+      Assert.assertEquals(1, stat.length);
+      String name = stat[0].getPath().getName();
+      Assert.assertEquals(name, "base_0000004");
+      checkExpectedTxnsPresent(stat[0].getPath(), null, 0, 1L, 4L);
+    } finally {
+      connection.close();
+    }
+  }
+
+  @Test
+  public void minorCompactAfterAbort() throws Exception {
+    String dbName = "default";
+    String tblName = "cws";
+    List<String> colNames = Arrays.asList("a", "b");
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+        " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to 
be bucketed
+        " STORED AS ORC", driver);
+
+    HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
+    DelimitedInputWriter writer = new DelimitedInputWriter(new String[] 
{"a","b"},",", endPt);
+    StreamingConnection connection = endPt.newConnection(false);
+    try {
+      // Write a couple of batches
+      for (int i = 0; i < 2; i++) {
+        writeBatch(connection, writer, false);
+      }
+
+      // Start a third batch, abort everything, don't properly close it
+      TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
+      txnBatch.beginNextTransaction();
+      txnBatch.abort();
+      txnBatch.beginNextTransaction();
+      txnBatch.abort();
+
+      // Now, compact
+      CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+      txnHandler.compact(new CompactionRequest(dbName, tblName, 
CompactionType.MINOR));
+      Worker t = new Worker();
+      t.setThreadId((int) t.getId());
+      t.setHiveConf(conf);
+      AtomicBoolean stop = new AtomicBoolean(true);
+      AtomicBoolean looped = new AtomicBoolean();
+      t.init(stop, looped);
+      t.run();
+
+      // Find the location of the table
+      IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+      Table table = msClient.getTable(dbName, tblName);
+      FileSystem fs = FileSystem.get(conf);
+      FileStatus[] stat =
+          fs.listStatus(new Path(table.getSd().getLocation()), 
AcidUtils.deltaFileFilter);
+      String[] names = new String[stat.length];
+      Path resultDelta = null;
+      for (int i = 0; i < names.length; i++) {
+        names[i] = stat[i].getPath().getName();
+        if (names[i].equals("delta_0000001_0000006")) {
+          resultDelta = stat[i].getPath();
+        }
+      }
+      Arrays.sort(names);
+      Assert.assertArrayEquals(names, new String[]{"delta_0000001_0000002",
+          "delta_0000001_0000006", "delta_0000003_0000004", 
"delta_0000005_0000006"});
+      checkExpectedTxnsPresent(null, new Path[]{resultDelta}, 0, 1L, 4L);
+    } finally {
+      connection.close();
+    }
+  }
+
+  @Test
+  public void majorCompactAfterAbort() throws Exception {
+    String dbName = "default";
+    String tblName = "cws";
+    List<String> colNames = Arrays.asList("a", "b");
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+        " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to 
be bucketed
+        " STORED AS ORC", driver);
+
+    HiveEndPoint endPt = new HiveEndPoint(null, dbName, tblName, null);
+    DelimitedInputWriter writer = new DelimitedInputWriter(new String[] 
{"a","b"},",", endPt);
+    StreamingConnection connection = endPt.newConnection(false);
+    try {
+      // Write a couple of batches
+      for (int i = 0; i < 2; i++) {
+        writeBatch(connection, writer, false);
+      }
+
+      // Start a third batch, but don't close it.
+      TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
+      txnBatch.beginNextTransaction();
+      txnBatch.abort();
+      txnBatch.beginNextTransaction();
+      txnBatch.abort();
+
+
+      // Now, compact
+      CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf);
+      txnHandler.compact(new CompactionRequest(dbName, tblName, 
CompactionType.MAJOR));
+      Worker t = new Worker();
+      t.setThreadId((int) t.getId());
+      t.setHiveConf(conf);
+      AtomicBoolean stop = new AtomicBoolean(true);
+      AtomicBoolean looped = new AtomicBoolean();
+      t.init(stop, looped);
+      t.run();
+
+      // Find the location of the table
+      IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+      Table table = msClient.getTable(dbName, tblName);
+      FileSystem fs = FileSystem.get(conf);
+      FileStatus[] stat =
+          fs.listStatus(new Path(table.getSd().getLocation()), 
AcidUtils.baseFileFilter);
+      Assert.assertEquals(1, stat.length);
+      String name = stat[0].getPath().getName();
+      Assert.assertEquals(name, "base_0000006");
+      checkExpectedTxnsPresent(stat[0].getPath(), null, 0, 1L, 4L);
+    } finally {
+      connection.close();
+    }
+  }
+  private void writeBatch(StreamingConnection connection, DelimitedInputWriter 
writer,
+                          boolean closeEarly)
+      throws InterruptedException, StreamingException {
+    TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer);
+    txnBatch.beginNextTransaction();
+    txnBatch.write("50,Kiev".getBytes());
+    txnBatch.write("51,St. Petersburg".getBytes());
+    txnBatch.write("44,Boston".getBytes());
+    txnBatch.commit();
+
+    if (!closeEarly) {
+      txnBatch.beginNextTransaction();
+      txnBatch.write("52,Tel Aviv".getBytes());
+      txnBatch.write("53,Atlantis".getBytes());
+      txnBatch.write("53,Boston".getBytes());
+      txnBatch.commit();
+
+      txnBatch.close();
+    }
+  }
+
+  private void checkExpectedTxnsPresent(Path base, Path[] deltas, int bucket, 
long min, long max)
+      throws IOException {
+    ValidTxnList txnList = new ValidTxnList() {
+      @Override
+      public boolean isTxnValid(long txnid) {
+        return true;
+      }
+
+      @Override
+      public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) {
+        return RangeResponse.ALL;
+      }
+
+      @Override
+      public String writeToString() {
+        return "";
+      }
+
+      @Override
+      public void readFromString(String src) {
+
+      }
+
+      @Override
+      public long getHighWatermark() {
+        return  Long.MAX_VALUE;
+      }
+
+      @Override
+      public long[] getInvalidTransactions() {
+        return new long[0];
+      }
+    };
+
+    OrcInputFormat aif = new OrcInputFormat();
+
+    AcidInputFormat.RawReader<OrcStruct> reader =
+        aif.getRawReader(new Configuration(), false, bucket, txnList, base, 
deltas);
+    RecordIdentifier identifier = reader.createKey();
+    OrcStruct value = reader.createValue();
+    long currentTxn = min;
+    boolean seenCurrentTxn = false;
+    while (reader.next(identifier, value)) {
+      if (!seenCurrentTxn) {
+        Assert.assertEquals(currentTxn, identifier.getTransactionId());
+        seenCurrentTxn = true;
+      }
+      if (currentTxn != identifier.getTransactionId()) {
+        Assert.assertEquals(currentTxn + 1, identifier.getTransactionId());
+        currentTxn++;
+      }
+    }
+    Assert.assertEquals(max, currentTxn);
+  }
+
   /**
    * convenience method to execute a select stmt and dump results to log file
    */

Modified: 
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- 
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
 (original)
+++ 
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
 Mon Jan 26 21:21:45 2015
@@ -1747,12 +1747,12 @@ public class HiveMetaStoreClient impleme
 
   @Override
   public ValidTxnList getValidTxns() throws TException {
-    return TxnHandler.createValidTxnList(client.get_open_txns(), 0);
+    return TxnHandler.createValidReadTxnList(client.get_open_txns(), 0);
   }
 
   @Override
   public ValidTxnList getValidTxns(long currentTxn) throws TException {
-    return TxnHandler.createValidTxnList(client.get_open_txns(), currentTxn);
+    return TxnHandler.createValidReadTxnList(client.get_open_txns(), 
currentTxn);
   }
 
   @Override

Modified: 
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- 
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
 (original)
+++ 
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
 Mon Jan 26 21:21:45 2015
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.util.StringUtils;
@@ -576,6 +577,25 @@ public class CompactionTxnHandler extend
       return findColumnsWithStats(ci);
     }
   }
+
+  /**
+   * Transform a {@link 
org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse} to a
+   * {@link org.apache.hadoop.hive.common.ValidTxnList}.  This assumes that 
the caller intends to
+   * compact the files, and thus treats only open transactions as invalid.
+   * @param txns txn list from the metastore
+   * @return a valid txn list.
+   */
+  public static ValidTxnList createValidCompactTxnList(GetOpenTxnsInfoResponse 
txns) {
+    long highWater = txns.getTxn_high_water_mark();
+    long minOpenTxn = Long.MAX_VALUE;
+    long[] exceptions = new long[txns.getOpen_txnsSize()];
+    int i = 0;
+    for (TxnInfo txn : txns.getOpen_txns()) {
+      if (txn.getState() == TxnState.OPEN) minOpenTxn = Math.min(minOpenTxn, 
txn.getId());
+      exceptions[i++] = txn.getId();
+    }
+    return new ValidCompactorTxnList(exceptions, minOpenTxn, highWater);
+  }
 }
 
 

Modified: 
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- 
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
 (original)
+++ 
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
 Mon Jan 26 21:21:45 2015
@@ -29,7 +29,7 @@ import org.apache.commons.dbcp.PoolingDa
 import org.apache.commons.pool.ObjectPool;
 import org.apache.commons.pool.impl.GenericObjectPool;
 import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidTxnListImpl;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -249,14 +249,15 @@ public class TxnHandler {
 
   /**
    * Transform a {@link 
org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse} to a
-   * {@link org.apache.hadoop.hive.common.ValidTxnList}.
+   * {@link org.apache.hadoop.hive.common.ValidTxnList}.  This assumes that 
the caller intends to
+   * read the files, and thus treats both open and aborted transactions as 
invalid.
    * @param txns txn list from the metastore
    * @param currentTxn Current transaction that the user has open.  If this is 
greater than 0 it
    *                   will be removed from the exceptions list so that the 
user sees his own
    *                   transaction as valid.
    * @return a valid txn list.
    */
-  public static ValidTxnList createValidTxnList(GetOpenTxnsResponse txns, long 
currentTxn) {
+  public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, 
long currentTxn) {
     long highWater = txns.getTxn_high_water_mark();
     Set<Long> open = txns.getOpen_txns();
     long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)];
@@ -265,7 +266,7 @@ public class TxnHandler {
       if (currentTxn > 0 && currentTxn == txn) continue;
       exceptions[i++] = txn;
     }
-    return new ValidTxnListImpl(exceptions, highWater);
+    return new ValidReadTxnList(exceptions, highWater);
   }
 
   public OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException {

Added: 
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java?rev=1654899&view=auto
==============================================================================
--- 
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
 (added)
+++ 
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/ValidCompactorTxnList.java
 Mon Jan 26 21:21:45 2015
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.txn;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+
+import java.util.Arrays;
+
+/**
+ * And implmentation of {@link org.apache.hadoop.hive.common.ValidTxnList} for 
use by the compactor.
+ * For the purposes of {@link #isTxnRangeValid} this class will view a 
transaction as valid if it
+ * is committed or aborted.  Additionally it will return none if there are any 
open transactions
+ * below the max transaction given, since we don't want to compact above open 
transactions.  For
+ * {@link #isTxnValid} it will still view a transaction as valid only if it is 
committed.  These
+ * produce the logic we need to assure that the compactor only sees records 
less than the lowest
+ * open transaction when choosing which files to compact, but that it still 
ignores aborted
+ * records when compacting.
+ */
+public class ValidCompactorTxnList extends ValidReadTxnList {
+
+  // The minimum open transaction id
+  private long minOpenTxn;
+
+  public ValidCompactorTxnList() {
+    super();
+    minOpenTxn = -1;
+  }
+
+  /**
+   *
+   * @param exceptions list of all open and aborted transactions
+   * @param minOpen lowest open transaction
+   * @param highWatermark highest committed transaction
+   */
+  public ValidCompactorTxnList(long[] exceptions, long minOpen, long 
highWatermark) {
+    super(exceptions, highWatermark);
+    minOpenTxn = minOpen;
+  }
+
+  public ValidCompactorTxnList(String value) {
+    super(value);
+  }
+
+  @Override
+  public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) {
+    if (highWatermark < minTxnId) {
+      return RangeResponse.NONE;
+    } else if (minOpenTxn < 0) {
+      return highWatermark >= maxTxnId ? RangeResponse.ALL : 
RangeResponse.NONE;
+    } else {
+      return minOpenTxn > maxTxnId ? RangeResponse.ALL : RangeResponse.NONE;
+    }
+  }
+
+  @Override
+  public String writeToString() {
+    StringBuilder buf = new StringBuilder();
+    buf.append(highWatermark);
+    buf.append(':');
+    buf.append(minOpenTxn);
+    if (exceptions.length == 0) {
+      buf.append(':');
+    } else {
+      for(long except: exceptions) {
+        buf.append(':');
+        buf.append(except);
+      }
+    }
+    return buf.toString();
+  }
+
+  @Override
+  public void readFromString(String src) {
+    if (src == null) {
+      highWatermark = Long.MAX_VALUE;
+      exceptions = new long[0];
+    } else {
+      String[] values = src.split(":");
+      highWatermark = Long.parseLong(values[0]);
+      minOpenTxn = Long.parseLong(values[1]);
+      exceptions = new long[values.length - 2];
+      for(int i = 2; i < values.length; ++i) {
+        exceptions[i-2] = Long.parseLong(values[i]);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  long getMinOpenTxn() {
+    return minOpenTxn;
+  }
+}

Added: 
hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java?rev=1654899&view=auto
==============================================================================
--- 
hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java
 (added)
+++ 
hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/txn/TestValidCompactorTxnList.java
 Mon Jan 26 21:21:45 2015
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestValidCompactorTxnList {
+
+  @Test
+  public void minTxnHigh() {
+    ValidTxnList txns = new ValidCompactorTxnList(new long[]{3, 4}, 3, 5);
+    ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
+    Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp);
+  }
+
+  @Test
+  public void maxTxnLow() {
+    ValidTxnList txns = new ValidCompactorTxnList(new long[]{13, 14}, 13, 15);
+    ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
+    Assert.assertEquals(ValidTxnList.RangeResponse.ALL, rsp);
+  }
+
+  @Test
+  public void minTxnHighNoExceptions() {
+    ValidTxnList txns = new ValidCompactorTxnList(new long[0], -1, 5);
+    ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
+    Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp);
+  }
+
+  @Test
+  public void maxTxnLowNoExceptions() {
+    ValidTxnList txns = new ValidCompactorTxnList(new long[0], -1, 15);
+    ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
+    Assert.assertEquals(ValidTxnList.RangeResponse.ALL, rsp);
+  }
+
+  @Test
+  public void exceptionsAllBelow() {
+    ValidTxnList txns = new ValidCompactorTxnList(new long[]{3, 6}, 3, 15);
+    ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
+    Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp);
+  }
+
+  @Test
+  public void exceptionsInMidst() {
+    ValidTxnList txns = new ValidCompactorTxnList(new long[]{8}, 8, 15);
+    ValidTxnList.RangeResponse rsp = txns.isTxnRangeValid(7, 9);
+    Assert.assertEquals(ValidTxnList.RangeResponse.NONE, rsp);
+  }
+
+  @Test
+  public void writeToString() {
+    ValidTxnList txns = new ValidCompactorTxnList(new long[]{9, 7, 10}, 9, 37);
+    Assert.assertEquals("37:9:7:9:10", txns.writeToString());
+    txns = new ValidCompactorTxnList();
+    Assert.assertEquals(Long.toString(Long.MAX_VALUE) + ":-1:", 
txns.writeToString());
+    txns = new ValidCompactorTxnList(new long[0], -1, 23);
+    Assert.assertEquals("23:-1:", txns.writeToString());
+  }
+
+  @Test
+  public void readFromString() {
+    ValidCompactorTxnList txns = new ValidCompactorTxnList("37:9:7:9:10");
+    Assert.assertEquals(37L, txns.getHighWatermark());
+    Assert.assertEquals(9L, txns.getMinOpenTxn());
+    Assert.assertArrayEquals(new long[]{7L, 9L, 10L}, 
txns.getInvalidTransactions());
+    txns = new ValidCompactorTxnList("21:-1:");
+    Assert.assertEquals(21L, txns.getHighWatermark());
+    Assert.assertEquals(-1L, txns.getMinOpenTxn());
+    Assert.assertEquals(0, txns.getInvalidTransactions().length);
+
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java Mon Jan 
26 21:21:45 2015
@@ -43,7 +43,14 @@ public class AcidUtils {
   // This key will be put in the conf file when planning an acid operation
   public static final String CONF_ACID_KEY = "hive.doing.acid";
   public static final String BASE_PREFIX = "base_";
+  public static final PathFilter baseFileFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path path) {
+      return path.getName().startsWith(BASE_PREFIX);
+    }
+  };
   public static final String DELTA_PREFIX = "delta_";
+  public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length";
   public static final PathFilter deltaFileFilter = new PathFilter() {
     @Override
     public boolean accept(Path path) {
@@ -54,7 +61,8 @@ public class AcidUtils {
   public static final PathFilter bucketFileFilter = new PathFilter() {
     @Override
     public boolean accept(Path path) {
-      return path.getName().startsWith(BUCKET_PREFIX);
+      return path.getName().startsWith(BUCKET_PREFIX) &&
+          !path.getName().endsWith(DELTA_SIDE_FILE_SUFFIX);
     }
   };
   public static final String BUCKET_DIGITS = "%05d";
@@ -369,7 +377,7 @@ public class AcidUtils {
         }
       } else if (fn.startsWith(DELTA_PREFIX) && child.isDir()) {
         ParsedDelta delta = parseDelta(child);
-        if (txnList.isTxnRangeCommitted(delta.minTransaction,
+        if (txnList.isTxnRangeValid(delta.minTransaction,
             delta.maxTransaction) !=
             ValidTxnList.RangeResponse.NONE) {
           working.add(delta);
@@ -402,7 +410,7 @@ public class AcidUtils {
     for(ParsedDelta next: working) {
       if (next.maxTransaction > current) {
         // are any of the new transactions ones that we care about?
-        if (txnList.isTxnRangeCommitted(current+1, next.maxTransaction) !=
+        if (txnList.isTxnRangeValid(current+1, next.maxTransaction) !=
             ValidTxnList.RangeResponse.NONE) {
           deltas.add(next);
           current = next.maxTransaction;

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java 
Mon Jan 26 21:21:45 2015
@@ -30,7 +30,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidTxnListImpl;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -408,7 +408,7 @@ public class OrcInputFormat  implements
       }
       String value = conf.get(ValidTxnList.VALID_TXNS_KEY,
                               Long.MAX_VALUE + ":");
-      transactionList = new ValidTxnListImpl(value);
+      transactionList = new ValidReadTxnList(value);
     }
 
     int getSchedulers() {
@@ -1132,7 +1132,7 @@ public class OrcInputFormat  implements
     }
     String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY,
                                 Long.MAX_VALUE + ":");
-    ValidTxnList validTxnList = new ValidTxnListImpl(txnString);
+    ValidTxnList validTxnList = new ValidReadTxnList(txnString);
     final OrcRawRecordMerger records =
         new OrcRawRecordMerger(conf, true, reader, split.isOriginal(), bucket,
             validTxnList, readOptions, deltas);

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java 
(original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java 
Mon Jan 26 21:21:45 2015
@@ -575,7 +575,7 @@ public class OrcRawRecordMerger implemen
       }
 
       // if this transaction isn't ok, skip over it
-      if (!validTxnList.isTxnCommitted(
+      if (!validTxnList.isTxnValid(
           ((ReaderKey) recordIdentifier).getCurrentTransactionId())) {
         continue;
       }

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java 
(original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java 
Mon Jan 26 21:21:45 2015
@@ -129,7 +129,7 @@ public class OrcRecordUpdater implements
   }
 
   static Path getSideFile(Path main) {
-    return new Path(main + "_flush_length");
+    return new Path(main + AcidUtils.DELTA_SIDE_FILE_SUFFIX);
   }
 
   static int getOperation(OrcStruct struct) {

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java 
(original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java 
Mon Jan 26 21:21:45 2015
@@ -20,7 +20,7 @@ package org.apache.hadoop.hive.ql.lockmg
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidTxnListImpl;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.ql.Context;
@@ -199,7 +199,7 @@ class DummyTxnManager extends HiveTxnMan
 
   @Override
   public ValidTxnList getValidTxns() throws LockException {
-    return new ValidTxnListImpl();
+    return new ValidReadTxnList();
   }
 
   @Override

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java 
Mon Jan 26 21:21:45 2015
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidTxnListImpl;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
@@ -183,7 +183,7 @@ public class Cleaner extends CompactorTh
       // Create a bogus validTxnList with a high water mark set to MAX_LONG 
and no open
       // transactions.  This assures that all deltas are treated as valid and 
all we return are
       // obsolete files.
-      final ValidTxnList txnList = new ValidTxnListImpl();
+      final ValidTxnList txnList = new ValidReadTxnList();
 
       if (runJobAsSelf(ci.runAs)) {
         removeFiles(location, txnList);

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java 
(original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java 
Mon Jan 26 21:21:45 2015
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidTxnListImpl;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -159,7 +159,7 @@ public class CompactorMR {
 
     if (parsedDeltas == null || parsedDeltas.size() == 0) {
       // Seriously, no deltas?  Can't compact that.
-      LOG.error("No delta files found to compact in " + sd.getLocation());
+      LOG.error(  "No delta files found to compact in " + sd.getLocation());
       return;
     }
 
@@ -505,7 +505,7 @@ public class CompactorMR {
       AcidInputFormat<WritableComparable, V> aif =
           instantiate(AcidInputFormat.class, 
jobConf.get(INPUT_FORMAT_CLASS_NAME));
       ValidTxnList txnList =
-          new ValidTxnListImpl(jobConf.get(ValidTxnList.VALID_TXNS_KEY));
+          new ValidReadTxnList(jobConf.get(ValidTxnList.VALID_TXNS_KEY));
 
       boolean isMajor = jobConf.getBoolean(IS_MAJOR, false);
       AcidInputFormat.RawReader<V> reader =

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java 
(original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java 
Mon Jan 26 21:21:45 2015
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
 import org.apache.hadoop.hive.metastore.txn.TxnHandler;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -77,7 +78,8 @@ public class Initiator extends Compactor
         // don't doom the entire thread.
         try {
           ShowCompactResponse currentCompactions = txnHandler.showCompact(new 
ShowCompactRequest());
-          ValidTxnList txns = 
TxnHandler.createValidTxnList(txnHandler.getOpenTxns(), 0);
+          ValidTxnList txns =
+              
CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
           Set<CompactionInfo> potentials = 
txnHandler.findPotentialCompactions(abortedThreshold);
           LOG.debug("Found " + potentials.size() + " potential compactions, " +
               "checking to see if we should compact any of them");

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java 
Mon Jan 26 21:21:45 2015
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler;
 import org.apache.hadoop.hive.metastore.txn.TxnHandler;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
@@ -121,7 +122,8 @@ public class Worker extends CompactorThr
 
         final boolean isMajor = ci.isMajorCompaction();
         final ValidTxnList txns =
-            TxnHandler.createValidTxnList(txnHandler.getOpenTxns(), 0);
+            
CompactionTxnHandler.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
+        LOG.debug("ValidCompactTxnList: " + txns.writeToString());
         final StringBuffer jobName = new StringBuffer(name);
         jobName.append("-compactor-");
         jobName.append(ci.getFullPartitionName());

Modified: 
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- 
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java 
(original)
+++ 
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java 
Mon Jan 26 21:21:45 2015
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidTxnListImpl;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
@@ -51,7 +51,6 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
@@ -114,7 +113,7 @@ public class TestFileSinkOperator {
         "testFileSinkOperator");
     tmpdir.mkdir();
     tmpdir.deleteOnExit();
-    txnList = new ValidTxnListImpl(new long[]{}, 2);
+    txnList = new ValidReadTxnList(new long[]{}, 2);
   }
 
   @Test

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java 
(original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java Mon 
Jan 26 21:21:45 2015
@@ -20,7 +20,8 @@ package org.apache.hadoop.hive.ql.io;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.ValidTxnListImpl;
+import org.apache.hadoop.hive.metastore.txn.ValidCompactorTxnList;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile;
 import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFileSystem;
@@ -91,7 +92,7 @@ public class TestAcidUtils {
         new MockFile("mock:/tbl/part1/subdir/000000_0", 0, new byte[0]));
     AcidUtils.Directory dir =
         AcidUtils.getAcidState(new MockPath(fs, "/tbl/part1"), conf,
-            new ValidTxnListImpl("100:"));
+            new ValidReadTxnList("100:"));
     assertEquals(null, dir.getBaseDirectory());
     assertEquals(0, dir.getCurrentDirectories().size());
     assertEquals(0, dir.getObsolete().size());
@@ -121,7 +122,7 @@ public class TestAcidUtils {
         new MockFile("mock:/tbl/part1/delta_101_101/bucket_0", 0, new 
byte[0]));
     AcidUtils.Directory dir =
         AcidUtils.getAcidState(new TestInputOutputFormat.MockPath(fs,
-            "mock:/tbl/part1"), conf, new ValidTxnListImpl("100:"));
+            "mock:/tbl/part1"), conf, new ValidReadTxnList("100:"));
     assertEquals(null, dir.getBaseDirectory());
     List<FileStatus> obsolete = dir.getObsolete();
     assertEquals(2, obsolete.size());
@@ -162,7 +163,7 @@ public class TestAcidUtils {
         new MockFile("mock:/tbl/part1/delta_90_120/bucket_0", 0, new byte[0]));
     AcidUtils.Directory dir =
         AcidUtils.getAcidState(new TestInputOutputFormat.MockPath(fs,
-            "mock:/tbl/part1"), conf, new ValidTxnListImpl("100:"));
+            "mock:/tbl/part1"), conf, new ValidReadTxnList("100:"));
     assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString());
     List<FileStatus> obsolete = dir.getObsolete();
     assertEquals(5, obsolete.size());
@@ -191,7 +192,7 @@ public class TestAcidUtils {
         new MockFile("mock:/tbl/part1/base_200/bucket_0", 500, new byte[0]));
     Path part = new MockPath(fs, "/tbl/part1");
     AcidUtils.Directory dir =
-        AcidUtils.getAcidState(part, conf, new ValidTxnListImpl("150:"));
+        AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:"));
     assertEquals("mock:/tbl/part1/base_200", 
dir.getBaseDirectory().toString());
     List<FileStatus> obsoletes = dir.getObsolete();
     assertEquals(4, obsoletes.size());
@@ -202,7 +203,7 @@ public class TestAcidUtils {
     assertEquals(0, dir.getOriginalFiles().size());
     assertEquals(0, dir.getCurrentDirectories().size());
     // we should always get the latest base
-    dir = AcidUtils.getAcidState(part, conf, new ValidTxnListImpl("10:"));
+    dir = AcidUtils.getAcidState(part, conf, new ValidReadTxnList("10:"));
     assertEquals("mock:/tbl/part1/base_200", 
dir.getBaseDirectory().toString());
   }
 
@@ -216,7 +217,7 @@ public class TestAcidUtils {
         new MockFile("mock:/tbl/part1/000001_1", 500, new byte[0]));
     Path part = new MockPath(fs, "/tbl/part1");
     AcidUtils.Directory dir =
-        AcidUtils.getAcidState(part, conf, new ValidTxnListImpl("150:"));
+        AcidUtils.getAcidState(part, conf, new ValidReadTxnList("150:"));
     // The two original buckets won't be in the obsolete list because we don't 
look at those
     // until we have determined there is no base.
     List<FileStatus> obsolete = dir.getObsolete();
@@ -239,7 +240,7 @@ public class TestAcidUtils {
         new MockFile("mock:/tbl/part1/base_50/bucket_0", 500, new byte[0]));
     Path part = new MockPath(fs, "mock:/tbl/part1");
     AcidUtils.Directory dir =
-        AcidUtils.getAcidState(part, conf, new ValidTxnListImpl("100:"));
+        AcidUtils.getAcidState(part, conf, new ValidReadTxnList("100:"));
     assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
     List<FileStatus> obsolete = dir.getObsolete();
     assertEquals(2, obsolete.size());
@@ -252,4 +253,51 @@ public class TestAcidUtils {
     assertEquals("mock:/tbl/part1/delta_000062_62", 
delts.get(2).getPath().toString());
     assertEquals("mock:/tbl/part1/delta_0000063_63", 
delts.get(3).getPath().toString());
   }
+
+  @Test
+  public void deltasWithOpenTxnInRead() throws Exception {
+    Configuration conf = new Configuration();
+    MockFileSystem fs = new MockFileSystem(conf,
+        new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]));
+    Path part = new MockPath(fs, "mock:/tbl/part1");
+    AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new 
ValidReadTxnList("100:4"));
+    List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
+    assertEquals(2, delts.size());
+    assertEquals("mock:/tbl/part1/delta_1_1", 
delts.get(0).getPath().toString());
+    assertEquals("mock:/tbl/part1/delta_2_5", 
delts.get(1).getPath().toString());
+  }
+
+  @Test
+  public void deltasWithOpenTxnsNotInCompact() throws Exception {
+    Configuration conf = new Configuration();
+    MockFileSystem fs = new MockFileSystem(conf,
+        new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]));
+    Path part = new MockPath(fs, "mock:/tbl/part1");
+    AcidUtils.Directory dir =
+        AcidUtils.getAcidState(part, conf, new ValidCompactorTxnList("100:4"));
+    List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
+    assertEquals(1, delts.size());
+    assertEquals("mock:/tbl/part1/delta_1_1", 
delts.get(0).getPath().toString());
+  }
+
+  @Test
+  public void deltasWithOpenTxnsNotInCompact2() throws Exception {
+    Configuration conf = new Configuration();
+    MockFileSystem fs = new MockFileSystem(conf,
+        new MockFile("mock:/tbl/part1/delta_1_1/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delta_2_5/bucket_0", 500, new byte[0]),
+        new MockFile("mock:/tbl/part1/delta_2_5/bucket_0" + 
AcidUtils.DELTA_SIDE_FILE_SUFFIX, 500,
+            new byte[0]),
+        new MockFile("mock:/tbl/part1/delta_6_10/bucket_0", 500, new byte[0]));
+    Path part = new MockPath(fs, "mock:/tbl/part1");
+    AcidUtils.Directory dir =
+        AcidUtils.getAcidState(part, conf, new ValidCompactorTxnList("100:4"));
+    List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
+    assertEquals(1, delts.size());
+    assertEquals("mock:/tbl/part1/delta_1_1", 
delts.get(0).getPath().toString());
+  }
+
+
 }

Modified: 
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- 
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
 (original)
+++ 
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
 Mon Jan 26 21:21:45 2015
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidTxnListImpl;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
@@ -301,7 +301,7 @@ public class TestOrcRawRecordMerger {
   }
 
   private static ValidTxnList createMaximalTxnList() {
-    return new ValidTxnListImpl(Long.MAX_VALUE + ":");
+    return new ValidReadTxnList(Long.MAX_VALUE + ":");
   }
 
   @Test
@@ -492,7 +492,7 @@ public class TestOrcRawRecordMerger {
         .maximumTransactionId(100);
     of.getRecordUpdater(root, options).close(false);
 
-    ValidTxnList txnList = new ValidTxnListImpl("200:");
+    ValidTxnList txnList = new ValidReadTxnList("200:");
     AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, 
txnList);
 
     Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
@@ -550,7 +550,7 @@ public class TestOrcRawRecordMerger {
     ru.delete(200, new MyRow("", 8, 0, BUCKET));
     ru.close(false);
 
-    ValidTxnList txnList = new ValidTxnListImpl("200:");
+    ValidTxnList txnList = new ValidReadTxnList("200:");
     AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, 
txnList);
 
     assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory());
@@ -734,7 +734,7 @@ public class TestOrcRawRecordMerger {
     merger.close();
 
     // try ignoring the 200 transaction and make sure it works still
-    ValidTxnList txns = new ValidTxnListImpl("2000:200");
+    ValidTxnList txns = new ValidReadTxnList("2000:200");
     merger =
         new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
             txns, new Reader.Options(),

Modified: 
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java?rev=1654899&r1=1654898&r2=1654899&view=diff
==============================================================================
--- 
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
 (original)
+++ 
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
 Mon Jan 26 21:21:45 2015
@@ -51,6 +51,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.Stack;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -156,6 +157,11 @@ public abstract class CompactorTest {
     addFile(t, p, minTxn, maxTxn, numRecords, FileType.DELTA, 2, true);
   }
 
+  protected void addLengthFile(Table t, Partition p, long minTxn, long maxTxn, 
int numRecords)
+    throws Exception {
+    addFile(t, p, minTxn, maxTxn, numRecords, FileType.LENGTH_FILE, 2, true);
+  }
+
   protected void addBaseFile(Table t, Partition p, long maxTxn, int 
numRecords) throws Exception {
     addFile(t, p, 0, maxTxn, numRecords, FileType.BASE, 2, true);
   }
@@ -185,9 +191,21 @@ public abstract class CompactorTest {
     return paths;
   }
 
-  protected void burnThroughTransactions(int num) throws MetaException, 
NoSuchTxnException, TxnAbortedException {
+  protected void burnThroughTransactions(int num)
+      throws MetaException, NoSuchTxnException, TxnAbortedException {
+    burnThroughTransactions(num, null, null);
+  }
+
+  protected void burnThroughTransactions(int num, Set<Long> open, Set<Long> 
aborted)
+      throws MetaException, NoSuchTxnException, TxnAbortedException {
     OpenTxnsResponse rsp = txnHandler.openTxns(new OpenTxnRequest(num, "me", 
"localhost"));
-    for (long tid : rsp.getTxn_ids()) txnHandler.commitTxn(new 
CommitTxnRequest(tid));
+    for (long tid : rsp.getTxn_ids()) {
+      if (aborted != null && aborted.contains(tid)) {
+        txnHandler.abortTxn(new AbortTxnRequest(tid));
+      } else if (open == null || (open != null && !open.contains(tid))) {
+        txnHandler.commitTxn(new CommitTxnRequest(tid));
+      }
+    }
   }
 
   protected void stopThread() {
@@ -249,7 +267,7 @@ public abstract class CompactorTest {
     return location;
   }
 
-  private enum FileType {BASE, DELTA, LEGACY};
+  private enum FileType {BASE, DELTA, LEGACY, LENGTH_FILE};
 
   private void addFile(Table t, Partition p, long minTxn, long maxTxn,
                        int numRecords,  FileType type, int numBuckets,
@@ -259,6 +277,7 @@ public abstract class CompactorTest {
     String filename = null;
     switch (type) {
       case BASE: filename = "base_" + maxTxn; break;
+      case LENGTH_FILE: // Fall through to delta
       case DELTA: filename = "delta_" + minTxn + "_" + maxTxn; break;
       case LEGACY: break; // handled below
     }
@@ -273,12 +292,19 @@ public abstract class CompactorTest {
         Path dir = new Path(location, filename);
         fs.mkdirs(dir);
         partFile = AcidUtils.createBucketFile(dir, bucket);
+        if (type == FileType.LENGTH_FILE) {
+          partFile = new Path(partFile.toString() + 
AcidUtils.DELTA_SIDE_FILE_SUFFIX);
+        }
       }
       FSDataOutputStream out = fs.create(partFile);
-      for (int i = 0; i < numRecords; i++) {
-        RecordIdentifier ri = new RecordIdentifier(maxTxn - 1, bucket, i);
-        ri.write(out);
-        out.writeBytes("mary had a little lamb its fleece was white as 
snow\n");
+      if (type == FileType.LENGTH_FILE) {
+        out.writeInt(numRecords);
+      } else {
+        for (int i = 0; i < numRecords; i++) {
+          RecordIdentifier ri = new RecordIdentifier(maxTxn - 1, bucket, i);
+          ri.write(out);
+          out.writeBytes("mary had a little lamb its fleece was white as 
snow\n");
+        }
       }
       out.close();
     }


Reply via email to