This is an automated email from the ASF dual-hosted git repository. daijy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new a20ffca HIVE-21625: Fix TxnIdUtils.checkEquivalentWriteIds, also provides a comparison method (Daniel Dai, reviewed by Jason Dere) a20ffca is described below commit a20ffcaf6d9c5639402f003236398a06e2177924 Author: Daniel Dai <dai...@gmail.com> AuthorDate: Mon May 6 21:35:09 2019 -0700 HIVE-21625: Fix TxnIdUtils.checkEquivalentWriteIds, also provides a comparison method (Daniel Dai, reviewed by Jason Dere) --- .../org/apache/hive/common/util/TxnIdUtils.java | 95 ++++++++++++---------- .../apache/hive/common/util/TestTxnIdUtils.java | 77 ++++++++++++++++++ 2 files changed, 128 insertions(+), 44 deletions(-) diff --git a/storage-api/src/java/org/apache/hive/common/util/TxnIdUtils.java b/storage-api/src/java/org/apache/hive/common/util/TxnIdUtils.java index 4b3cb7d..bd972d4 100644 --- a/storage-api/src/java/org/apache/hive/common/util/TxnIdUtils.java +++ b/storage-api/src/java/org/apache/hive/common/util/TxnIdUtils.java @@ -19,62 +19,69 @@ package org.apache.hive.common.util; import org.apache.hadoop.hive.common.ValidWriteIdList; -import java.util.*; - public class TxnIdUtils { /** * Check if 2 ValidWriteIdLists are at an equivalent commit point. */ public static boolean checkEquivalentWriteIds(ValidWriteIdList a, ValidWriteIdList b) { + return compare(a, b) == 0; + } + + /*** Compare the freshness of two ValidWriteIdList + * @param a + * @param b + * @return 0, if a and b are equivalent + * 1, if a is more recent + * -1, if b is more recent + ***/ + public static int compare(ValidWriteIdList a, ValidWriteIdList b) { if (!a.getTableName().equalsIgnoreCase(b.getTableName())) { - return false; + return a.getTableName().toLowerCase().compareTo(b.getTableName().toLowerCase()); } - ValidWriteIdList newer = a; - ValidWriteIdList older = b; - if (a.getHighWatermark() < b.getHighWatermark()) { - newer = b; - older = a; + // The algorithm assumes invalidWriteIds are sorted and values are less or equal than hwm, here is how + // the algorithm works: + // 1. Compare two invalidWriteIds until one the list ends, difference means the mismatch writeid is + // committed in one ValidWriteIdList but not the other, the comparison end + // 2. Every writeid from the last writeid in the short invalidWriteIds till its hwm should be committed + // in the other ValidWriteIdList, otherwise the comparison end + // 3. Every writeid from lower hwm to higher hwm should be invalid, otherwise, the comparison end + int minLen = Math.min(a.getInvalidWriteIds().length, b.getInvalidWriteIds().length); + for (int i=0;i<minLen;i++) { + if (a.getInvalidWriteIds()[i] == b.getInvalidWriteIds()[i]) { + continue; + } + return a.getInvalidWriteIds()[i] > b.getInvalidWriteIds()[i]?1:-1; } - - return checkEquivalentCommittedIds( - older.getHighWatermark(), older.getInvalidWriteIds(), - newer.getHighWatermark(), newer.getInvalidWriteIds()); - } - - /** - * Check the min open ID/highwater mark/exceptions list to see if 2 ID lists are at the same commit point. - * This can also be used for ValidTxnList as well as ValidWriteIdList. - */ - private static boolean checkEquivalentCommittedIds( - long oldHWM, long[] oldInvalidIds, - long newHWM, long[] newInvalidIds) { - - // There should be no valid txns in newer list that are not also in older. - // - All values in oldInvalidIds should also be in newInvalidIds. - // - if oldHWM < newHWM, then all IDs between oldHWM .. newHWM should exist in newInvalidTxns. - // A Gap in the sequence means a committed txn in newer list (lists are not equivalent) - - if (newInvalidIds.length < oldInvalidIds.length) { - return false; + if (a.getInvalidWriteIds().length == b.getInvalidWriteIds().length) { + return Long.signum(a.getHighWatermark() - b.getHighWatermark()); } - - // Check that the values in the older list are also in newer. Lists should already be sorted. - for (int idx = 0; idx < oldInvalidIds.length; ++idx) { - if (oldInvalidIds[idx] != newInvalidIds[idx]) { - return false; + if (a.getInvalidWriteIds().length == minLen) { + if (a.getHighWatermark() != b.getInvalidWriteIds()[minLen] -1) { + return Long.signum(a.getHighWatermark() - (b.getInvalidWriteIds()[minLen] -1)); + } + if (allInvalidFrom(b.getInvalidWriteIds(), minLen, b.getHighWatermark())) { + return 0; + } else { + return -1; + } + } else { + if (b.getHighWatermark() != a.getInvalidWriteIds()[minLen] -1) { + return Long.signum(b.getHighWatermark() - (a.getInvalidWriteIds()[minLen] -1)); + } + if (allInvalidFrom(a.getInvalidWriteIds(), minLen, a.getHighWatermark())) { + return 0; + } else { + return 1; } } - - // If older committed state is equivalent to newer state, then there should be no committed IDs - // between oldHWM and newHWM, and newInvalidIds should have exactly (newHWM - oldHWM) - // more entries than oldInvalidIds. - long oldNewListSizeDifference = newInvalidIds.length - oldInvalidIds.length; - long oldNewHWMDifference = newHWM - oldHWM; - if (oldNewHWMDifference != oldNewListSizeDifference) { - return false; + } + private static boolean allInvalidFrom(long[] invalidIds, int start, long hwm) { + for (int i=start+1;i<invalidIds.length;i++) { + if (invalidIds[i] != (invalidIds[i-1]+1)) { + return false; + } } - - return true; + return invalidIds[invalidIds.length-1] == hwm; } } diff --git a/storage-api/src/test/org/apache/hive/common/util/TestTxnIdUtils.java b/storage-api/src/test/org/apache/hive/common/util/TestTxnIdUtils.java index 3d8f329..ab5a472 100644 --- a/storage-api/src/test/org/apache/hive/common/util/TestTxnIdUtils.java +++ b/storage-api/src/test/org/apache/hive/common/util/TestTxnIdUtils.java @@ -21,6 +21,7 @@ import java.util.BitSet; import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.junit.Test; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -114,4 +115,80 @@ public class TestTxnIdUtils { assertFalse(TxnIdUtils.checkEquivalentWriteIds(id3, id5)); assertFalse(TxnIdUtils.checkEquivalentWriteIds(id4, id5)); } + + @Test + public void testCompareWriteIds() throws Exception { + // same writeids + assertEquals(TxnIdUtils.compare( + new ValidReaderWriteIdList("default.table1", new long[] {}, new BitSet(), 5), + new ValidReaderWriteIdList("default.table1", new long[] {}, new BitSet(), 5)), + 0); + + // b has high hwm + assertEquals(TxnIdUtils.compare( + new ValidReaderWriteIdList("default.table1", new long[] {}, new BitSet(), 4), + new ValidReaderWriteIdList("default.table1", new long[] {}, new BitSet(), 5)), + -1); + + // a has a hole in invalid writeids + assertEquals(TxnIdUtils.compare( + new ValidReaderWriteIdList("default.table1", new long[] {1,2,4,5}, new BitSet(), 5), + new ValidReaderWriteIdList("default.table1", new long[] {1,2,3,4,5}, new BitSet(), 5)), + 1); + + // a has high hwm + assertEquals(TxnIdUtils.compare( + new ValidReaderWriteIdList("default.table1", new long[] {2,4,5}, new BitSet(), 6), + new ValidReaderWriteIdList("default.table1", new long[] {2,4,5}, new BitSet(), 5)), + 1); + + // non are committed + assertEquals(TxnIdUtils.compare( + new ValidReaderWriteIdList("default.table1", new long[] {1,2,3}, new BitSet(), 3), + new ValidReaderWriteIdList("default.table1", new long[] {1,2,3,4,5}, new BitSet(), 5)), + 0); + + // all invalid above lower hwm + assertEquals(TxnIdUtils.compare( + new ValidReaderWriteIdList("default.table1", new long[] {1,2,3}, new BitSet(), 9), + new ValidReaderWriteIdList("default.table1", new long[] {1,2,3,10,11}, new BitSet(), 11)), + 0); + + // same invalid writeids, different hwm + assertEquals(TxnIdUtils.compare( + new ValidReaderWriteIdList("default.table1", new long[] {1,2,3}, new BitSet(), 9), + new ValidReaderWriteIdList("default.table1", new long[] {1,2,3}, new BitSet(), 11)), + -1); + + // all invalid above lower hwm + assertEquals(TxnIdUtils.compare( + new ValidReaderWriteIdList("default.table1", new long[] {}, new BitSet(), 9), + new ValidReaderWriteIdList("default.table1", new long[] {10,11}, new BitSet(), 11)), + 0); + + // not all invalid above lower hwm + assertEquals(TxnIdUtils.compare( + new ValidReaderWriteIdList("default.table1", new long[] {}, new BitSet(), 9), + new ValidReaderWriteIdList("default.table1", new long[] {10}, new BitSet(), 11)), + -1); + + // not all invalid above lower hwm + assertEquals(TxnIdUtils.compare( + new ValidReaderWriteIdList("default.table1", new long[] {8}, new BitSet(), 9), + new ValidReaderWriteIdList("default.table1", new long[] {8}, new BitSet(), 11)), + -1); + + // all invalid above lower hwm + assertEquals(TxnIdUtils.compare( + new ValidReaderWriteIdList("default.table1", new long[] {8}, new BitSet(), 9), + new ValidReaderWriteIdList("default.table1", new long[] {8,10,11}, new BitSet(), 11)), + 0); + + // table name is different + assertEquals(TxnIdUtils.compare( + new ValidReaderWriteIdList("default.table1", new long[] {8,10,11}, new BitSet(), 11), + new ValidReaderWriteIdList("default.table2", new long[] {8,10,11}, new BitSet(), 11)), + -1); + + } }