This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new d77f473100 PHOENIX-7314 Enable CompactionScanner for flushes and minor compaction (#1896) d77f473100 is described below commit d77f4731005c5a8ffb593c7c0affcad9903bb4ce Author: Kadir Ozdemir <37155482+kadiro...@users.noreply.github.com> AuthorDate: Fri Jun 7 23:00:24 2024 +0300 PHOENIX-7314 Enable CompactionScanner for flushes and minor compaction (#1896) --- .../coprocessor/BaseScannerRegionObserver.java | 48 +-- .../phoenix/coprocessor/CompactionScanner.java | 462 ++++++++++++++------- .../UngroupedAggregateRegionObserver.java | 219 ++++++---- .../phoenix/end2end/MaxLookbackExtendedIT.java | 44 +- .../org/apache/phoenix/end2end/TableTTLIT.java | 41 +- .../java/org/apache/phoenix/util/TestUtil.java | 1 - 6 files changed, 516 insertions(+), 299 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index d27a187dd1..b25acfb72c 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -357,21 +357,12 @@ abstract public class BaseScannerRegionObserver implements RegionObserver { dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr, useQualiferAsListIndex); } - public void setScanOptionsForFlushesAndCompactions(Store store, ScanOptions options, - boolean retainAllVersions) { + public void setScanOptionsForFlushesAndCompactions(ScanOptions options) { // We want the store to give us all the deleted cells to StoreCompactionScanner options.setKeepDeletedCells(KeepDeletedCells.TTL); options.setTTL(HConstants.FOREVER); - if (retainAllVersions) { - options.setMaxVersions(Integer.MAX_VALUE); - options.setMinVersions(Integer.MAX_VALUE); - } else { - options.setMinVersions(Math.max(Math.max(options.getMaxVersions(), - store.getColumnFamilyDescriptor().getMaxVersions()), 1)); - options.setMinVersions(Math.max(Math.max(options.getMinVersions(), - store.getColumnFamilyDescriptor().getMaxVersions()), 1)); - } - + options.setMaxVersions(Integer.MAX_VALUE); + options.setMinVersions(Integer.MAX_VALUE); } @Override @@ -380,16 +371,13 @@ abstract public class BaseScannerRegionObserver implements RegionObserver { CompactionRequest request) throws IOException { Configuration conf = c.getEnvironment().getConfiguration(); if (isPhoenixTableTTLEnabled(conf)) { - boolean retainAllVersions = isMaxLookbackTimeEnabled( - BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf)) - || request.isMajor(); - setScanOptionsForFlushesAndCompactions(store, options, retainAllVersions); + setScanOptionsForFlushesAndCompactions(options); return; } - long maxLookbackAge = getMaxLookbackAge(c); - if (isMaxLookbackTimeEnabled(maxLookbackAge)) { + long maxLookbackAgeInMillis = getMaxLookbackAge(c); + if (isMaxLookbackTimeEnabled(maxLookbackAgeInMillis)) { setScanOptionsForFlushesAndCompactionsWhenPhoenixTTLIsDisabled(conf, options, store, - scanType, maxLookbackAge); + scanType, maxLookbackAgeInMillis); } } @@ -399,16 +387,14 @@ abstract public class BaseScannerRegionObserver implements RegionObserver { Configuration conf = c.getEnvironment().getConfiguration(); if (isPhoenixTableTTLEnabled(conf)) { - boolean retainAllVersions = isMaxLookbackTimeEnabled( - BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf)); - setScanOptionsForFlushesAndCompactions(store, options, retainAllVersions); + setScanOptionsForFlushesAndCompactions(options); return; } - long maxLookbackAge = getMaxLookbackAge(c); - if (isMaxLookbackTimeEnabled(maxLookbackAge)) { + long maxLookbackAgeInMillis = getMaxLookbackAge(c); + if (isMaxLookbackTimeEnabled(maxLookbackAgeInMillis)) { setScanOptionsForFlushesAndCompactionsWhenPhoenixTTLIsDisabled(conf, options, store, - ScanType.COMPACT_RETAIN_DELETES, maxLookbackAge); + ScanType.COMPACT_RETAIN_DELETES, maxLookbackAgeInMillis); } } @@ -418,13 +404,11 @@ abstract public class BaseScannerRegionObserver implements RegionObserver { throws IOException { Configuration conf = c.getEnvironment().getConfiguration(); if (isPhoenixTableTTLEnabled(conf)) { - boolean retainAllVersions = isMaxLookbackTimeEnabled( - BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf)); - setScanOptionsForFlushesAndCompactions(store, options, retainAllVersions); + setScanOptionsForFlushesAndCompactions(options); return; } - long maxLookbackAge = getMaxLookbackAge(c); - if (isMaxLookbackTimeEnabled(maxLookbackAge)) { + long maxLookbackAgeInMillis = getMaxLookbackAge(c); + if (isMaxLookbackTimeEnabled(maxLookbackAgeInMillis)) { MemoryCompactionPolicy inMemPolicy = store.getColumnFamilyDescriptor().getInMemoryCompaction(); ScanType scanType; @@ -437,7 +421,7 @@ abstract public class BaseScannerRegionObserver implements RegionObserver { scanType = ScanType.COMPACT_RETAIN_DELETES; } setScanOptionsForFlushesAndCompactionsWhenPhoenixTTLIsDisabled(conf, options, store, - scanType, maxLookbackAge); + scanType, maxLookbackAgeInMillis); } } @@ -447,7 +431,7 @@ abstract public class BaseScannerRegionObserver implements RegionObserver { Configuration conf = c.getEnvironment().getConfiguration(); if (isPhoenixTableTTLEnabled(conf)) { - setScanOptionsForFlushesAndCompactions(store, options, true); + setScanOptionsForFlushesAndCompactions(options); return; } if (!storeFileScanDoesntNeedAlteration(options)) { diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index ebe92b8741..2fcf91dc64 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -50,9 +51,25 @@ import org.slf4j.LoggerFactory; import static org.apache.phoenix.query.QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX; /** - * The store scanner that implements Phoenix TTL and Max Lookback. Phoenix overrides the - * HBase implementation of data retention policies which is built at the cell level, and implements - * its row level data retention within this store scanner. + * The store scanner that implements compaction for Phoenix. Phoenix coproc overrides the scan + * options so that HBase store scanner retains all cells during compaction and flushes. Then this + * store scanner decides which cells to retain. This is required to ensure rows do not expire + * partially and to preserve all cells within Phoenix max lookback window. + * + * The compaction process is optimized for Phoenix. This optimization assumes that + * . A given delete family or delete family version marker is inserted to all column families + * . A given delete family version marker always delete a full version of a row. Please note + * delete family version markers are used only on index tables where mutations are always + * full row mutations. + * + * During major compaction, minor compaction and memstore flush, all cells (and delete markers) + * that are visible through the max lookback window are retained. Outside the max lookback window, + * (1) extra put cell versions, (2) delete markers and deleted cells that are not supposed to be + * kept (by the KeepDeletedCell option), and (3) expired cells are removed during major compaction. + * During flushes and minor compaction, expired cells and delete markers are not removed however + * deleted cells that are not supposed to be kept (by the KeepDeletedCell option) and extra put + * cell versions are removed. + * */ public class CompactionScanner implements InternalScanner { private static final Logger LOGGER = LoggerFactory.getLogger(CompactionScanner.class); @@ -64,7 +81,7 @@ public class CompactionScanner implements InternalScanner { private final RegionCoprocessorEnvironment env; private long maxLookbackWindowStart; private long ttlWindowStart; - private long ttl; + private long ttlInMillis; private final long maxLookbackInMillis; private int minVersion; private int maxVersion; @@ -81,13 +98,19 @@ public class CompactionScanner implements InternalScanner { private static Map<String, Long> maxLookbackMap = new ConcurrentHashMap<>(); private PhoenixLevelRowCompactor phoenixLevelRowCompactor; private HBaseLevelRowCompactor hBaseLevelRowCompactor; + private boolean major; + private long inputCellCount = 0; + private long outputCellCount = 0; + private boolean phoenixLevelOnly = false; public CompactionScanner(RegionCoprocessorEnvironment env, Store store, InternalScanner storeScanner, long maxLookbackInMillis, byte[] emptyCF, - byte[] emptyCQ) { + byte[] emptyCQ, + boolean major, + boolean keepDeleted) { this.storeScanner = storeScanner; this.region = env.getRegion(); this.store = store; @@ -99,8 +122,7 @@ public class CompactionScanner implements InternalScanner { columnFamilyName = store.getColumnFamilyName(); storeColumnFamily = columnFamilyName.getBytes(); tableName = region.getRegionInfo().getTable().getNameAsString(); - Long overriddenMaxLookback = - maxLookbackMap.remove(tableName + SEPARATOR + columnFamilyName); + Long overriddenMaxLookback = maxLookbackMap.get(tableName + SEPARATOR + columnFamilyName); this.maxLookbackInMillis = overriddenMaxLookback == null ? maxLookbackInMillis : Math.max(maxLookbackInMillis, overriddenMaxLookback); // The oldest scn is current time - maxLookbackInMillis. Phoenix sets the scan time range @@ -109,22 +131,25 @@ public class CompactionScanner implements InternalScanner { this.maxLookbackWindowStart = this.maxLookbackInMillis == 0 ? compactionTime : compactionTime - (this.maxLookbackInMillis + 1); ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor(); - ttl = cfd.getTimeToLive(); - this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime - ttl * 1000; - ttl *= 1000; + this.major = major; + int ttl = major ? cfd.getTimeToLive() : HConstants.FOREVER; + ttlInMillis = ((long) ttl) * 1000; + this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime - ttlInMillis; this.maxLookbackWindowStart = Math.max(ttlWindowStart, maxLookbackWindowStart); this.minVersion = cfd.getMinVersions(); this.maxVersion = cfd.getMaxVersions(); - this.keepDeletedCells = cfd.getKeepDeletedCells(); + this.keepDeletedCells = keepDeleted ? KeepDeletedCells.TTL : cfd.getKeepDeletedCells(); familyCount = region.getTableDescriptor().getColumnFamilies().length; localIndex = columnFamilyName.startsWith(LOCAL_INDEX_COLUMN_FAMILY_PREFIX); - emptyCFStore = familyCount == 1 || columnFamilyName.equals(Bytes.toString(emptyCF)) - || localIndex; + emptyCFStore = major + ? familyCount == 1 || columnFamilyName.equals(Bytes.toString(emptyCF)) + || localIndex + : true; // we do not need to identify emptyCFStore for minor compaction or flushes phoenixLevelRowCompactor = new PhoenixLevelRowCompactor(); hBaseLevelRowCompactor = new HBaseLevelRowCompactor(); - LOGGER.info("Starting Phoenix CompactionScanner for table " + tableName + " store " - + columnFamilyName + " ttl " + ttl + "ms " + "max lookback " - + maxLookbackInMillis + "ms"); + LOGGER.info("Starting CompactionScanner for table " + tableName + " store " + + columnFamilyName + (major ? " major " : " not major ") + "compaction ttl " + + ttlInMillis + "ms " + "max lookback " + this.maxLookbackInMillis + "ms"); } /** @@ -138,16 +163,71 @@ public class CompactionScanner implements InternalScanner { } Long old = maxLookbackMap.putIfAbsent(tableName + SEPARATOR + columnFamilyName, maxLookbackInMillis); - if (old != null && old < maxLookbackInMillis) { + if (old != null) { maxLookbackMap.put(tableName + SEPARATOR + columnFamilyName, maxLookbackInMillis); } } + public static long getMaxLookbackInMillis(String tableName, String columnFamilyName, + long maxLookbackInMillis) { + if (tableName == null || columnFamilyName == null) { + return maxLookbackInMillis; + } + Long value = maxLookbackMap.get(tableName + CompactionScanner.SEPARATOR + columnFamilyName); + return value == null + ? maxLookbackInMillis + : maxLookbackMap.get(tableName + CompactionScanner.SEPARATOR + columnFamilyName); + } + static class CellTimeComparator implements Comparator<Cell> { + public static final CellTimeComparator COMPARATOR = new CellTimeComparator(); + @Override public int compare(Cell o1, Cell o2) { + long ts1 = o1.getTimestamp(); + long ts2 = o2.getTimestamp(); + if (ts1 == ts2) return 0; + if (ts1 > ts2) return -1; + return 1; + } + + @Override public boolean equals(Object obj) { + return false; + } + } + private void printRow(List<Cell> result, String title, boolean sort) { + List<Cell> row; + if (sort) { + row = new ArrayList<>(result); + Collections.sort(row, CellTimeComparator.COMPARATOR); + } else { + row = result; + } + System.out.println("---- " + title + " ----"); + System.out.println((major ? "Major " : "Not major ") + + "compaction time: " + compactionTime); + System.out.println("Max lookback window start time: " + maxLookbackWindowStart); + System.out.println("Max lookback in ms: " + maxLookbackInMillis); + System.out.println("TTL in ms: " + ttlInMillis); + boolean maxLookbackLine = false; + boolean ttlLine = false; + for (Cell cell : row) { + if (!maxLookbackLine && cell.getTimestamp() < maxLookbackWindowStart) { + System.out.println("-----> Max lookback window start time: " + maxLookbackWindowStart); + maxLookbackLine = true; + } else if (!ttlLine && cell.getTimestamp() < ttlWindowStart) { + System.out.println("-----> TTL window start time: " + ttlWindowStart); + ttlLine = true; + } + System.out.println(cell); + } + } @Override public boolean next(List<Cell> result) throws IOException { boolean hasMore = storeScanner.next(result); + inputCellCount += result.size(); if (!result.isEmpty()) { + // printRow(result, "Input for " + tableName + " " + columnFamilyName, true); // This is for debugging phoenixLevelRowCompactor.compact(result, false); + outputCellCount += result.size(); + // printRow(result, "Output for " + tableName + " " + columnFamilyName, true); // This is for debugging } return hasMore; } @@ -159,8 +239,10 @@ public class CompactionScanner implements InternalScanner { @Override public void close() throws IOException { - LOGGER.info("Closing Phoenix CompactionScanner for table " + tableName + " store " - + columnFamilyName); + LOGGER.info("Closing CompactionScanner for table " + tableName + " store " + + columnFamilyName + (major ? " major " : " not major ") + "compaction retained " + + outputCellCount + " of " + inputCellCount + " cells" + + (phoenixLevelOnly ? " phoenix level only" : "")); storeScanner.close(); } @@ -171,13 +253,19 @@ public class CompactionScanner implements InternalScanner { static class RowContext { Cell familyDeleteMarker = null; Cell familyVersionDeleteMarker = null; - List<Cell> columnDeleteMarkers = null; + List<Cell> columnDeleteMarkers = new ArrayList<>(); int version = 0; long maxTimestamp; long minTimestamp; + + private void init() { + familyDeleteMarker = null; + familyVersionDeleteMarker = null; + columnDeleteMarkers.clear(); + version = 0; + } private void addColumnDeleteMarker(Cell deleteMarker) { - if (columnDeleteMarkers == null) { - columnDeleteMarkers = new ArrayList<>(); + if (columnDeleteMarkers.isEmpty()) { columnDeleteMarkers.add(deleteMarker); return; } @@ -200,6 +288,8 @@ public class CompactionScanner implements InternalScanner { // Set it to null so it will be used once familyVersionDeleteMarker = null; } else { + // The same delete family marker may be retained multiple times. Duplicates will be + // removed later retainedCells.add(familyDeleteMarker); } } @@ -217,9 +307,13 @@ public class CompactionScanner implements InternalScanner { if ((CellUtil.matchingFamily(cell, dm)) && CellUtil.matchingQualifier(cell, dm)) { if (dm.getType() == Cell.Type.Delete) { - // Delete is for deleting a specific cell version. Thus, it can be used - // to delete only one cell. - columnDeleteMarkers.remove(i); + if (cell.getTimestamp() == dm.getTimestamp()) { + // Delete is for deleting a specific cell version. Thus, it can be used + // to delete only one cell. + columnDeleteMarkers.remove(i); + } else { + continue; + } } if (maxTimestamp >= ttlWindowStart) { // Inside the TTL window @@ -253,6 +347,9 @@ public class CompactionScanner implements InternalScanner { Cell firstCell; LinkedList<Cell> deleteColumn = null; long ts; + // The next row version is formed by the first cell of each column. Similarly, the min + // max timestamp of the cells of a row version is determined by looking at just first + // cell of the columns for (LinkedList<Cell> column : columns) { firstCell = column.getFirst(); ts = firstCell.getTimestamp(); @@ -269,7 +366,7 @@ public class CompactionScanner implements InternalScanner { } } if (deleteColumn != null) { - // A row version do not cross a family delete marker. This means + // A row version cannot cross a family delete marker by definition. This means // min timestamp cannot be lower than the delete markers timestamp for (Cell cell : deleteColumn) { ts = cell.getTimestamp(); @@ -280,6 +377,41 @@ public class CompactionScanner implements InternalScanner { } } } + + /** + * This is used for Phoenix level compaction + */ + private void getNextRowVersionTimestamps(List<Cell> row, byte[] columnFamily) { + maxTimestamp = 0; + minTimestamp = Long.MAX_VALUE; + Cell deleteFamily = null; + long ts; + // The next row version is formed by the first cell of each column. Similarly, the min + // max timestamp of the cells of a row version is determined by looking at just first + // cell of the columns + for (Cell cell : row) { + ts = cell.getTimestamp(); + if ((cell.getType() == Cell.Type.DeleteFamily || + cell.getType() == Cell.Type.DeleteFamilyVersion) && + CellUtil.matchingFamily(cell, columnFamily)) { + deleteFamily = cell; + } + if (maxTimestamp < ts) { + maxTimestamp = ts; + } + if (minTimestamp > ts) { + minTimestamp = ts; + } + } + if (deleteFamily != null) { + // A row version cannot cross a family delete marker by definition. This means + // min timestamp cannot be lower than the delete markers timestamp + ts = deleteFamily.getTimestamp(); + if (ts < maxTimestamp) { + minTimestamp = ts + 1; + } + } + } } /** @@ -288,6 +420,8 @@ public class CompactionScanner implements InternalScanner { * */ class HBaseLevelRowCompactor { + private RowContext rowContext = new RowContext(); + private CompactionRowVersion rowVersion = new CompactionRowVersion(); /** * A compaction row version includes the latest put cell versions from each column such that * the cell versions do not cross delete family markers. In other words, the compaction row @@ -315,6 +449,10 @@ public class CompactionScanner implements InternalScanner { // The version of a row version. It is the minimum of the versions of the cells included // in the row version int version = 0; + + private void init() { + cells.clear(); + } @Override public String toString() { StringBuilder output = new StringBuilder(); @@ -331,14 +469,13 @@ public class CompactionScanner implements InternalScanner { * Decide if compaction row versions inside the TTL window should be retained. The * versions are retained if one of the following conditions holds * 1. The compaction row version is alive and its version is less than VERSIONS - * 2. The compaction row version is deleted and KeepDeletedCells is TTL - * 3. The compaction row version is deleted, its version is less than MIN_VERSIONS and - * KeepDeletedCells is TRUE + * 2. The compaction row version is deleted and KeepDeletedCells is not FALSE * */ private void retainInsideTTLWindow(CompactionRowVersion rowVersion, RowContext rowContext, List<Cell> retainedCells) { - if (rowContext.familyDeleteMarker == null && rowContext.familyVersionDeleteMarker == null) { + if (rowContext.familyDeleteMarker == null + && rowContext.familyVersionDeleteMarker == null) { // The compaction row version is alive if (rowVersion.version < maxVersion) { // Rule 1 @@ -346,9 +483,8 @@ public class CompactionScanner implements InternalScanner { } } else { // Deleted - if ((rowVersion.version < maxVersion && keepDeletedCells == KeepDeletedCells.TRUE) || - keepDeletedCells == KeepDeletedCells.TTL) { - // Retain based on rule 2 or 3 + if (rowVersion.version < maxVersion && keepDeletedCells != KeepDeletedCells.FALSE) { + // Retain based on rule 2 retainCells(rowVersion, rowContext, retainedCells); rowContext.retainFamilyDeleteMarker(retainedCells); } @@ -374,12 +510,9 @@ public class CompactionScanner implements InternalScanner { } } else { // Deleted compaction row version - if (keepDeletedCells == KeepDeletedCells.TTL && ( - (rowContext.familyVersionDeleteMarker != null && - rowContext.familyVersionDeleteMarker.getTimestamp() > ttlWindowStart) || - (rowContext.familyDeleteMarker != null && - rowContext.familyDeleteMarker.getTimestamp() > ttlWindowStart) - )) { + if (keepDeletedCells == KeepDeletedCells.TTL + && rowContext.familyDeleteMarker != null + && rowContext.familyDeleteMarker.getTimestamp() > ttlWindowStart) { // Rule 2 retainCells(rowVersion, rowContext, retainedCells); rowContext.retainFamilyDeleteMarker(retainedCells); @@ -406,10 +539,9 @@ public class CompactionScanner implements InternalScanner { */ private void formNextCompactionRowVersion(LinkedList<LinkedList<Cell>> columns, RowContext rowContext, List<Cell> retainedCells) { - CompactionRowVersion rowVersion = new CompactionRowVersion(); + rowVersion.init(); rowContext.getNextRowVersionTimestamps(columns, storeColumnFamily); rowVersion.ts = rowContext.maxTimestamp; - rowVersion.version = rowContext.version++; for (LinkedList<Cell> column : columns) { Cell cell = column.getFirst(); if (column.getFirst().getTimestamp() < rowContext.minTimestamp) { @@ -419,13 +551,15 @@ public class CompactionScanner implements InternalScanner { if (cell.getTimestamp() >= rowContext.maxTimestamp) { rowContext.familyDeleteMarker = cell; column.removeFirst(); + break; } continue; } else if (cell.getType() == Cell.Type.DeleteFamilyVersion) { - if (cell.getTimestamp() >= rowContext.maxTimestamp) { + if (cell.getTimestamp() == rowVersion.ts) { rowContext.familyVersionDeleteMarker = cell; column.removeFirst(); + break; } continue; } @@ -440,6 +574,7 @@ public class CompactionScanner implements InternalScanner { if (rowVersion.cells.isEmpty()) { return; } + rowVersion.version = rowContext.version++; if (rowVersion.ts >= ttlWindowStart) { retainInsideTTLWindow(rowVersion, rowContext, retainedCells); } else { @@ -449,7 +584,7 @@ public class CompactionScanner implements InternalScanner { private void formCompactionRowVersions(LinkedList<LinkedList<Cell>> columns, List<Cell> result) { - RowContext rowContext = new RowContext(); + rowContext.init(); while (!columns.isEmpty()) { formNextCompactionRowVersion(columns, rowContext, result); // Remove the columns that are empty @@ -468,14 +603,10 @@ public class CompactionScanner implements InternalScanner { * the pair of family name and column qualifier. While doing that also add the delete * markers to a separate list. */ - private void formColumns(List<Cell> result, LinkedList<LinkedList<Cell>> columns, - List<Cell> deleteMarkers) { + private void formColumns(List<Cell> result, LinkedList<LinkedList<Cell>> columns) { Cell currentColumnCell = null; LinkedList<Cell> currentColumn = null; for (Cell cell : result) { - if (cell.getType() != Cell.Type.Put) { - deleteMarkers.add(cell); - } if (currentColumnCell == null) { currentColumn = new LinkedList<>(); currentColumnCell = cell; @@ -503,8 +634,7 @@ public class CompactionScanner implements InternalScanner { return; } LinkedList<LinkedList<Cell>> columns = new LinkedList<>(); - List<Cell> deleteMarkers = new ArrayList<>(); - formColumns(result, columns, deleteMarkers); + formColumns(result, columns); result.clear(); formCompactionRowVersions(columns, result); } @@ -520,6 +650,12 @@ public class CompactionScanner implements InternalScanner { * */ class PhoenixLevelRowCompactor { + private RowContext rowContext = new RowContext(); + List<Cell> lastRowVersion = new ArrayList<>(); + List<Cell> emptyColumn = new ArrayList<>(); + List<Cell> phoenixResult = new ArrayList<>(); + List<Cell> trimmedRow = new ArrayList<>(); + List<Cell> trimmedEmptyColumn = new ArrayList<>(); /** * The cells of the row (i.e., result) read from HBase store are lexicographically ordered @@ -529,33 +665,53 @@ public class CompactionScanner implements InternalScanner { * based on the pair of family name and column qualifier. * * The cells within the max lookback window except the once at the lower edge of the - * max lookback window are retained immediately and not included in the constructed columns. + * max lookback window (the last row of the max lookback window) are retained immediately. + * + * This method also returned the remaining cells (outside the max lookback window) of + * the empty colum */ - private void getColumns(List<Cell> result, LinkedList<LinkedList<Cell>> columns, - List<Cell> retainedCells) { + private void getLastRowVersionInMaxLookbackWindow(List<Cell> result, + List<Cell> lastRowVersion, List<Cell> retainedCells, List<Cell> emptyColumn) { Cell currentColumnCell = null; - LinkedList<Cell> currentColumn = null; + boolean isEmptyColumn = false; + Cell cellAtMaxLookbackWindowStart = null; for (Cell cell : result) { - if (cell.getTimestamp() > maxLookbackWindowStart) { + if (cell.getTimestamp() >= maxLookbackWindowStart) { retainedCells.add(cell); + if (cell.getTimestamp() == maxLookbackWindowStart) { + cellAtMaxLookbackWindowStart = cell; + } continue; } - if (currentColumnCell == null) { - currentColumn = new LinkedList<>(); - currentColumnCell = cell; - currentColumn.add(cell); - } else if (!CellUtil.matchingColumn(cell, currentColumnCell)) { - columns.add(currentColumn); - currentColumn = new LinkedList<>(); + if (!major && cell.getType() != Cell.Type.Put) { + retainedCells.add(cell); + } + if (currentColumnCell == null || + !CellUtil.matchingColumn(cell, currentColumnCell)) { currentColumnCell = cell; - currentColumn.add(cell); - } else { - currentColumn.add(cell); + if (cell.getType() != Cell.Type.Delete + && cell.getType() != Cell.Type.DeleteColumn) { + // Include only delete family markers and put cells + // It is possible that this cell is not visible from the max lookback + // window. This happens when there is a mutation with the mutation timestamp + // equal to the max lookback window start timestamp. The following is to + // check for this case + if (cellAtMaxLookbackWindowStart == null + || !CellUtil.matchingColumn(cell, cellAtMaxLookbackWindowStart)) { + lastRowVersion.add(cell); + } + } + if (major && ScanUtil.isEmptyColumn(cell, emptyCF, emptyCQ)) { + isEmptyColumn = true; + } else { + isEmptyColumn = false; + } + } else if (major && isEmptyColumn) { + // We only need to keep one cell for every column for the last row version. + // So here we just form the empty column beyond the last row version + emptyColumn.add(cell); } } - if (currentColumn != null) { - columns.add(currentColumn); - } } /** @@ -576,10 +732,10 @@ public class CompactionScanner implements InternalScanner { if (previous == -1) { break; } - if (max - ts > ttl) { + if (max - ts > ttlInMillis) { max = input.get(previous).getTimestamp(); output.add(input.remove(previous)); - if (max - min > ttl) { + if (max - min > ttlInMillis) { closeGap(max, min, input, output); } return; @@ -591,93 +747,88 @@ public class CompactionScanner implements InternalScanner { /** * Retain the last row version visible through the max lookback window */ - private void retainCellsOfLastRowVersion(LinkedList<LinkedList<Cell>> columns, - List<Cell> retainedCells) { - if (columns.isEmpty()) { + private void retainCellsOfLastRowVersion(List<Cell> lastRow, + List<Cell> emptyColumn, List<Cell> retainedCells) { + if (lastRow.isEmpty()) { return; } - RowContext rowContext = new RowContext(); - rowContext.getNextRowVersionTimestamps(columns, storeColumnFamily); - List<Cell> retainedPutCells = new ArrayList<>(); - for (LinkedList<Cell> column : columns) { - Cell cell = column.getFirst(); - if (cell.getTimestamp() < rowContext.minTimestamp) { - continue; - } - if (cell.getType() == Cell.Type.Put) { - retainedPutCells.add(cell); - } else if (cell.getType() == Cell.Type.DeleteFamily || - cell.getType() == Cell.Type.DeleteFamilyVersion) { - if (cell.getTimestamp() >= rowContext.maxTimestamp) { - // This means that the row version outside the max lookback window is - // deleted and thus should not be visible to the scn queries - if (cell.getTimestamp() == maxLookbackWindowStart) { - // Include delete markers at maxLookbackWindowStart - retainedCells.add(cell); - } - return; - } - } else if (cell.getTimestamp() == maxLookbackWindowStart) { - // Include delete markers at maxLookbackWindowStart - retainedCells.add(cell); + rowContext.init(); + rowContext.getNextRowVersionTimestamps(lastRow, storeColumnFamily); + Cell firstCell = lastRow.get(0); + if (firstCell.getType() == Cell.Type.DeleteFamily || + firstCell.getType() == Cell.Type.DeleteFamilyVersion) { + if (firstCell.getTimestamp() >= rowContext.maxTimestamp) { + // This means that the row version outside the max lookback window is + // deleted and thus should not be visible to the scn queries + return; } } - if (compactionTime - rowContext.maxTimestamp > maxLookbackInMillis + ttl) { + + if (compactionTime - rowContext.maxTimestamp > maxLookbackInMillis + ttlInMillis) { // The row version should not be visible via the max lookback window. Nothing to do return; } - retainedCells.addAll(retainedPutCells); + retainedCells.addAll(lastRow); // If the gap between two back to back mutations is more than ttl then the older // mutation will be considered expired and masked. If the length of the time range of // a row version is not more than ttl, then we know the cells covered by the row // version are not apart from each other more than ttl and will not be masked. - if (rowContext.maxTimestamp - rowContext.minTimestamp <= ttl) { + if (rowContext.maxTimestamp - rowContext.minTimestamp <= ttlInMillis) { return; } // The quick time range check did not pass. We need get at least one empty cell to cover // the gap so that the row version will not be masked by PhoenixTTLRegionScanner. - List<Cell> emptyCellColumn = null; - for (LinkedList<Cell> column : columns) { - if (ScanUtil.isEmptyColumn(column.getFirst(), emptyCF, emptyCQ)) { - emptyCellColumn = column; - break; - } - } - if (emptyCellColumn == null) { + if (emptyColumn.isEmpty()) { return; } - int size = retainedPutCells.size(); + int size = lastRow.size(); long tsArray[] = new long[size]; int i = 0; - for (Cell cell : retainedPutCells) { + for (Cell cell : lastRow) { tsArray[i++] = cell.getTimestamp(); } Arrays.sort(tsArray); for (i = size - 1; i > 0; i--) { - if (tsArray[i] - tsArray[i - 1] > ttl) { - closeGap(tsArray[i], tsArray[i - 1], emptyCellColumn, retainedCells); + if (tsArray[i] - tsArray[i - 1] > ttlInMillis) { + closeGap(tsArray[i], tsArray[i - 1], emptyColumn, retainedCells); } } } + /** + * The retained cells includes the cells that are visible through the max lookback + * window and the additional empty column cells that are needed to reduce large time + * between the cells of the last row version. + */ private boolean retainCellsForMaxLookback(List<Cell> result, boolean regionLevel, List<Cell> retainedCells) { - LinkedList<LinkedList<Cell>> columns = new LinkedList<>(); - getColumns(result, columns, retainedCells); + + lastRowVersion.clear(); + emptyColumn.clear(); + getLastRowVersionInMaxLookbackWindow(result, lastRowVersion, retainedCells, + emptyColumn); + if (lastRowVersion.isEmpty()) { + return true; + } + if (!major) { + // We do not expire cells for minor compaction and memstore flushes + retainCellsOfLastRowVersion(lastRowVersion, emptyColumn, retainedCells); + return true; + } long maxTimestamp = 0; long minTimestamp = Long.MAX_VALUE; long ts; - for (LinkedList<Cell> column : columns) { - ts = column.getFirst().getTimestamp(); + for (Cell cell : lastRowVersion) { + ts =cell.getTimestamp(); if (ts > maxTimestamp) { maxTimestamp = ts; } - ts = column.getLast().getTimestamp(); + ts = cell.getTimestamp(); if (ts < minTimestamp) { minTimestamp = ts; } } - if (compactionTime - maxTimestamp > maxLookbackInMillis + ttl) { + if (compactionTime - maxTimestamp > maxLookbackInMillis + ttlInMillis) { if (!emptyCFStore && !regionLevel) { // The row version is more than maxLookbackInMillis + ttl old. We cannot decide // if we should retain it with the store level compaction when the current @@ -688,7 +839,7 @@ public class CompactionScanner implements InternalScanner { } // If the time gap between two back to back mutations is more than ttl then we know // that the row is expired within the time gap. - if (maxTimestamp - minTimestamp > ttl) { + if (maxTimestamp - minTimestamp > ttlInMillis) { if ((familyCount > 1 && !regionLevel && !localIndex)) { // When there are more than one column family for a given table and a row // version constructed at the store level covers a time span larger than ttl, @@ -699,38 +850,63 @@ public class CompactionScanner implements InternalScanner { return false; } // We either have one column family or are doing region level compaction. In both - // case, we can safely trim the cells beyond the first time gap larger ttl - int size = result.size(); + // case, we can safely trim the cells beyond the first time gap larger ttl. + // Here we are interested in the gaps between the cells of the last row version + // amd thus we need to examine the gaps between these cells and the empty column. + // Please note that empty column is always updated for every mutation and so we + // just need empty column cells for the gap analysis. + int size = lastRowVersion.size(); + size += emptyColumn.size(); long tsArray[] = new long[size]; int i = 0; - for (Cell cell : result) { + for (Cell cell : lastRowVersion) { + tsArray[i++] = cell.getTimestamp(); + } + for (Cell cell : emptyColumn) { tsArray[i++] = cell.getTimestamp(); } Arrays.sort(tsArray); boolean gapFound = false; // Since timestamps are sorted in ascending order, traverse them in reverse order for (i = size - 1; i > 0; i--) { - if (tsArray[i] - tsArray[i - 1] > ttl) { + if (tsArray[i] - tsArray[i - 1] > ttlInMillis) { minTimestamp = tsArray[i]; gapFound = true; break; } } if (gapFound) { - List<Cell> trimmedResult = new ArrayList<>(size - i); - for (Cell cell : result) { + trimmedRow.clear(); + for (Cell cell : lastRowVersion) { if (cell.getTimestamp() >= minTimestamp) { - trimmedResult.add(cell); + trimmedRow.add(cell); } } - columns.clear(); - retainedCells.clear(); - getColumns(trimmedResult, columns, retainedCells); + lastRowVersion = trimmedRow; + trimmedEmptyColumn.clear();; + for (Cell cell : lastRowVersion) { + if (cell.getTimestamp() >= minTimestamp) { + trimmedEmptyColumn.add(cell); + } + } + emptyColumn = trimmedEmptyColumn; } } - retainCellsOfLastRowVersion(columns, retainedCells); + retainCellsOfLastRowVersion(lastRowVersion, emptyColumn, retainedCells); return true; } + private void removeDuplicates(List<Cell> input, List<Cell> output) { + Cell previousCell = null; + for (Cell cell : input) { + if (previousCell == null || + cell.getTimestamp() != previousCell.getTimestamp() || + cell.getType() != previousCell.getType() || + !CellUtil.matchingColumn(cell, previousCell)) { + output.add(cell); + } + previousCell = cell; + } + } /** * Compacts a single row at the Phoenix level. The result parameter is the input row and * modified to be the output of the compaction process. @@ -739,7 +915,7 @@ public class CompactionScanner implements InternalScanner { if (result.isEmpty()) { return; } - List<Cell> phoenixResult = new ArrayList<>(result.size()); + phoenixResult.clear(); if (!retainCellsForMaxLookback(result, regionLevel, phoenixResult)) { if (familyCount == 1 || regionLevel) { throw new RuntimeException("UNEXPECTED"); @@ -747,14 +923,17 @@ public class CompactionScanner implements InternalScanner { phoenixResult.clear(); compactRegionLevel(result, phoenixResult); } - if (maxVersion == 1 && minVersion == 0 && keepDeletedCells == KeepDeletedCells.FALSE) { - // We need to Phoenix level compaction only + if (maxVersion == 1 + && (!major + || (minVersion == 0 && keepDeletedCells == KeepDeletedCells.FALSE))) { + // We need Phoenix level compaction only Collections.sort(phoenixResult, CellComparator.getInstance()); result.clear(); - result.addAll(phoenixResult); + removeDuplicates(phoenixResult, result); + phoenixLevelOnly = true; return; } - // We may need to do retain more cells, and so we need to run HBase level compaction + // We may need to retain more cells, and so we need to run HBase level compaction // too. The result of two compactions will be merged and duplicate cells are removed. int phoenixResultSize = phoenixResult.size(); List<Cell> hbaseResult = new ArrayList<>(result); @@ -762,18 +941,7 @@ public class CompactionScanner implements InternalScanner { phoenixResult.addAll(hbaseResult); Collections.sort(phoenixResult, CellComparator.getInstance()); result.clear(); - Cell previousCell = null; - // Eliminate duplicates - for (Cell cell : phoenixResult) { - if (previousCell == null || - cell.getTimestamp() != previousCell.getTimestamp() || - cell.getType() != previousCell.getType() || - !CellUtil.matchingColumn(cell, previousCell)) { - result.add(cell); - } - previousCell = cell; - } - + removeDuplicates(phoenixResult, result); if (result.size() > phoenixResultSize) { LOGGER.debug("HBase level compaction retained " + (result.size() - phoenixResultSize) + " more cells"); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index d1b9f17112..d0e8361472 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -32,9 +32,11 @@ import java.security.PrivilegedExceptionAction; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import javax.annotation.concurrent.GuardedBy; @@ -59,6 +61,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; @@ -174,8 +177,26 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver private Configuration compactionConfig; private Configuration indexWriteConfig; private ReadOnlyProps indexWriteProps; - private boolean isPhoenixTableTTLEnabled; + private static Map<String, Long> maxLookbackMap = new ConcurrentHashMap<>(); + + public static void setMaxLookbackInMillis(String tableName, long maxLookbackInMillis) { + if (tableName == null) { + return; + } + maxLookbackMap.put(tableName, + maxLookbackInMillis); + } + + public static long getMaxLookbackInMillis(String tableName, long maxLookbackInMillis) { + if (tableName == null) { + return maxLookbackInMillis; + } + Long value = maxLookbackMap.get(tableName); + return value == null + ? maxLookbackInMillis + : maxLookbackMap.get(tableName); + } @Override public Optional<RegionObserver> getRegionObserver() { return Optional.of(this); @@ -205,9 +226,6 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver e.getConfiguration().getInt(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER, QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER)); indexWriteProps = new ReadOnlyProps(indexWriteConfig.iterator()); - isPhoenixTableTTLEnabled = - e.getConfiguration().getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED, - QueryServicesOptions.DEFAULT_PHOENIX_TABLE_TTL_ENABLED); } Configuration getUpsertSelectConfig() { @@ -581,92 +599,128 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver region.getTableDescriptor().getTableName().getName()) == 0); } + @Override + public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store, + InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { + if (!isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) { + return scanner; + } else { + return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() { + @Override public InternalScanner run() throws Exception { + String tableName = c.getEnvironment().getRegion().getRegionInfo().getTable() + .getNameAsString(); + long maxLookbackInMillis = + UngroupedAggregateRegionObserver.getMaxLookbackInMillis( + tableName, + BaseScannerRegionObserverConstants.getMaxLookbackInMillis( + c.getEnvironment().getConfiguration())); + maxLookbackInMillis = CompactionScanner.getMaxLookbackInMillis(tableName, + store.getColumnFamilyName(), maxLookbackInMillis); + return new CompactionScanner(c.getEnvironment(), store, scanner, + maxLookbackInMillis, null, null, false, true); + } + }); + } + } + @Override public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store, InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, CompactionRequest request) throws IOException { - if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) { - final TableName tableName = c.getEnvironment().getRegion().getRegionInfo().getTable(); - // Compaction and split upcalls run with the effective user context of the requesting user. - // This will lead to failure of cross cluster RPC if the effective user is not - // the login user. Switch to the login user context to ensure we have the expected - // security context. - return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() { - @Override - public InternalScanner run() throws Exception { - InternalScanner internalScanner = scanner; - if (request.isMajor()) { - boolean isDisabled = false; - boolean isMultiTenantIndexTable = false; - if (tableName.getNameAsString().startsWith(MetaDataUtil.VIEW_INDEX_TABLE_PREFIX)) { - isMultiTenantIndexTable = true; - } - final String fullTableName = isMultiTenantIndexTable ? - SchemaUtil.getParentTableNameFromIndexTable(tableName.getNameAsString(), - MetaDataUtil.VIEW_INDEX_TABLE_PREFIX) : - tableName.getNameAsString(); - PTable table = null; - try (PhoenixConnection conn = QueryUtil.getConnectionOnServer( - compactionConfig).unwrap(PhoenixConnection.class)) { - table = conn.getTableNoCache(fullTableName); - } catch (Exception e) { - if (e instanceof TableNotFoundException) { - LOGGER.debug("Ignoring HBase table that is not a Phoenix table: " - + fullTableName); - // non-Phoenix HBase tables won't be found, do nothing - } else { - LOGGER.error( - "Unable to modify compaction scanner to retain deleted " - + "cells for a table with disabled Index; " - + fullTableName, e); - } - } - // The previous indexing design needs to retain delete markers and deleted - // cells to rebuild disabled indexes. Thus, we skip major compaction for - // them. GlobalIndexChecker is the coprocessor introduced by the current - // indexing design. - if (table != null && - !PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME.equals(fullTableName) && - !ServerUtil.hasCoprocessor(c.getEnvironment(), - GlobalIndexChecker.class.getName())) { - List<PTable> - indexes = - PTableType.INDEX.equals(table.getType()) ? - Lists.newArrayList(table) : - table.getIndexes(); - // FIXME need to handle views and indexes on views as well - for (PTable index : indexes) { - if (index.getIndexDisableTimestamp() != 0) { - LOGGER.info("Modifying major compaction scanner to retain " - + "deleted cells for a table with disabled index: " - + fullTableName); - isDisabled = true; - break; - } - } - } - if (table != null && !isDisabled && isPhoenixTableTTLEnabled) { - internalScanner = - new CompactionScanner(c.getEnvironment(), store, scanner, - MetaDataUtil.getMaxLookbackAge( - c.getEnvironment().getConfiguration(), table.getMaxLookbackAge()), - SchemaUtil.getEmptyColumnFamily(table), - table.getEncodingScheme() - == PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ? - QueryConstants.EMPTY_COLUMN_BYTES : - table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME) - ); + + final TableName tableName = c.getEnvironment().getRegion().getRegionInfo().getTable(); + // Compaction and split upcalls run with the effective user context of the requesting user. + // This will lead to failure of cross cluster RPC if the effective user is not + // the login user. Switch to the login user context to ensure we have the expected + // security context. + return User.runAsLoginUser(new PrivilegedExceptionAction<InternalScanner>() { + @Override + public InternalScanner run() throws Exception { + InternalScanner internalScanner = scanner; + boolean keepDeleted = false; + boolean isMultiTenantIndexTable = false; + if (tableName.getNameAsString().startsWith(MetaDataUtil.VIEW_INDEX_TABLE_PREFIX)) { + isMultiTenantIndexTable = true; + } + final String fullTableName = isMultiTenantIndexTable ? + SchemaUtil.getParentTableNameFromIndexTable(tableName.getNameAsString(), + MetaDataUtil.VIEW_INDEX_TABLE_PREFIX) : + tableName.getNameAsString(); + PTable table = null; + Long maxLookbackAge = null; + try (PhoenixConnection conn = QueryUtil.getConnectionOnServer( + compactionConfig).unwrap(PhoenixConnection.class)) { + table = conn.getTableNoCache(fullTableName); + maxLookbackAge = table.getMaxLookbackAge(); + UngroupedAggregateRegionObserver.setMaxLookbackInMillis( + tableName.getNameAsString(), + MetaDataUtil.getMaxLookbackAge( + c.getEnvironment().getConfiguration(), + maxLookbackAge)); + } catch (Exception e) { + if (e instanceof TableNotFoundException) { + LOGGER.debug("Ignoring HBase table that is not a Phoenix table: " + + fullTableName); + // non-Phoenix HBase tables won't be found, do nothing + } else { + LOGGER.error( + "Unable to modify compaction scanner to retain deleted " + + "cells for a table with disabled Index; " + + fullTableName, e); + } + } + // The previous indexing design needs to retain delete markers and deleted + // cells to rebuild disabled indexes. Thus, we skip major compaction for + // them. GlobalIndexChecker is the coprocessor introduced by the current + // indexing design. + if (table != null && + !PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME.equals(fullTableName) && + !ServerUtil.hasCoprocessor(c.getEnvironment(), + GlobalIndexChecker.class.getName())) { + List<PTable> + indexes = + PTableType.INDEX.equals(table.getType()) ? + Lists.newArrayList(table) : + table.getIndexes(); + // FIXME need to handle views and indexes on views as well + for (PTable index : indexes) { + if (index.getIndexDisableTimestamp() != 0) { + LOGGER.info("Modifying major compaction scanner to retain " + + "deleted cells for a table with disabled index: " + + fullTableName); + keepDeleted = true; + break; } } + } + if (table != null + && isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) { + internalScanner = + new CompactionScanner(c.getEnvironment(), store, scanner, + MetaDataUtil.getMaxLookbackAge( + c.getEnvironment().getConfiguration(), + maxLookbackAge), + SchemaUtil.getEmptyColumnFamily(table), + table.getEncodingScheme() + == PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ? + QueryConstants.EMPTY_COLUMN_BYTES : + table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME), + request.isMajor() || request.isAllFiles(), keepDeleted + ); + } + if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) { try { long clientTimeStamp = EnvironmentEdgeManager.currentTimeMillis(); - DelegateRegionCoprocessorEnvironment compactionConfEnv = + DelegateRegionCoprocessorEnvironment + compactionConfEnv = new DelegateRegionCoprocessorEnvironment(c.getEnvironment(), ConnectionType.COMPACTION_CONNECTION); - StatisticsCollector statisticsCollector = + StatisticsCollector + statisticsCollector = StatisticsCollectorFactory.createStatisticsCollector( - compactionConfEnv, tableName.getNameAsString(), clientTimeStamp, - store.getColumnFamilyDescriptor().getName()); + compactionConfEnv, tableName.getNameAsString(), + clientTimeStamp, + store.getColumnFamilyDescriptor().getName()); statisticsCollector.init(); internalScanner = statisticsCollector.createCompactionScanner(compactionConfEnv, @@ -678,11 +732,10 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver LOGGER.warn("Unable to collect stats for " + tableName, e); } } - return internalScanner; } - }); - } - return scanner; + return internalScanner; + } + }); } static PTable deserializeTable(byte[] b) { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java index f9900fdb7b..e7157aee95 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java @@ -22,12 +22,10 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.coprocessor.CompactionScanner; import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixResultSet; -import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.query.BaseTest; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.schema.PTable; @@ -50,9 +48,7 @@ import org.junit.runners.Parameterized; import java.io.IOException; import java.sql.Connection; -import java.sql.Date; import java.sql.DriverManager; -import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; @@ -288,11 +284,12 @@ public class MaxLookbackExtendedIT extends BaseTest { long beforeFirstCompactSCN = EnvironmentEdgeManager.currentTimeMillis(); injectEdge.incrementValue(1); //new ts for major compaction majorCompact(dataTable); - majorCompact(indexTable); assertRawRowCount(conn, dataTable, rowsPlusDeleteMarker); + majorCompact(indexTable); assertRawRowCount(conn, indexTable, rowsPlusDeleteMarker); //wait for the lookback time. After this compactions should purge the deleted row - long timeToAdvance = hasTableLevelMaxLookback ? TABLE_LEVEL_MAX_LOOKBACK_AGE : MAX_LOOKBACK_AGE; + long timeToAdvance = hasTableLevelMaxLookback + ? TABLE_LEVEL_MAX_LOOKBACK_AGE : MAX_LOOKBACK_AGE; injectEdge.incrementValue(timeToAdvance * 1000); long beforeSecondCompactSCN = EnvironmentEdgeManager.currentTimeMillis(); String notDeletedRowSql = @@ -311,10 +308,14 @@ public class MaxLookbackExtendedIT extends BaseTest { Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() < beforeDeleteSCN + MAX_LOOKBACK_AGE * 1000); } + flush(dataTable); majorCompact(dataTable); - majorCompact(indexTable); // Deleted row versions should be removed. assertRawRowCount(conn, dataTable, ROWS_POPULATED); + + flush(indexTable); + majorCompact(indexTable); + // Deleted row versions should be removed. assertRawRowCount(conn, indexTable, ROWS_POPULATED); //deleted row should be gone, but not deleted row should still be there. @@ -425,7 +426,7 @@ public class MaxLookbackExtendedIT extends BaseTest { assertRawRowCount(conn, dataTable, originalRowCount); assertRawRowCount(conn, indexTable, originalRowCount); assertExplainPlan(conn, indexSql, dataTableName, indexName); - long timeToAdvance = (MAX_LOOKBACK_AGE * 1000) - + long timeToAdvance = (MAX_LOOKBACK_AGE * 1000L) - (EnvironmentEdgeManager.currentTimeMillis() - afterFirstInsertSCN); if (timeToAdvance > 0) { injectEdge.incrementValue(timeToAdvance); @@ -440,7 +441,7 @@ public class MaxLookbackExtendedIT extends BaseTest { assertRawRowCount(conn, dataTable, originalRowCount); assertRawRowCount(conn, indexTable, originalRowCount); //now wait the TTL - timeToAdvance = (ttl * 1000) - + timeToAdvance = (ttl * 1000L) - (EnvironmentEdgeManager.currentTimeMillis() - afterFirstInsertSCN); if (timeToAdvance > 0) { injectEdge.incrementValue(timeToAdvance); @@ -458,7 +459,7 @@ public class MaxLookbackExtendedIT extends BaseTest { Assert.assertEquals(0, rs.getInt(1)); // Increment the time by max lookback age and make sure that we can compact away // the now-expired rows - injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000); + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000L); majorCompact(dataTable); majorCompact(indexTable); assertRawRowCount(conn, dataTable, 0); @@ -579,34 +580,35 @@ public class MaxLookbackExtendedIT extends BaseTest { CompactionScanner.overrideMaxLookback(tableNameTwo, "A", maxLookbackTwo * 1000); CompactionScanner.overrideMaxLookback(tableNameTwo, "B", maxLookbackTwo * 1000); } - injectEdge.incrementValue(1); + injectEdge.incrementValue(1000); populateTable(tableNameOne); populateTable(tableNameTwo); - injectEdge.incrementValue(1); + injectEdge.incrementValue(1000); populateTable(tableNameOne); populateTable(tableNameTwo); - injectEdge.incrementValue(1); + injectEdge.incrementValue(1000); conn.createStatement().executeUpdate("DELETE FROM " + tableNameOne); conn.createStatement().executeUpdate("DELETE FROM " + tableNameTwo); conn.commit(); - injectEdge.incrementValue(1); // Move the time so that deleted row versions will be outside the maxlookback window - // of tableNameOne but the delete markers will be inside + // of tableNameOne but the delete markers will be inside (in this case on the lower + // edge of the window) injectEdge.incrementValue(maxLookbackOne * 1000); - // Compact both tables. Deleted row versions will be removed from tableNameOne as they - // are now outside its max lookback window. + // Compact both tables. Deleted row versions will be retained since the delete marker + // is retained. This is necessary to include the preimage of the row in the CDC + // stream. flush(TableName.valueOf(tableNameOne)); flush(TableName.valueOf(tableNameTwo)); majorCompact(TableName.valueOf(tableNameOne)); majorCompact(TableName.valueOf(tableNameTwo)); if (multiCF) { - assertRawCellCount(conn, TableName.valueOf(tableNameOne), Bytes.toBytes("a"), 3); - assertRawCellCount(conn, TableName.valueOf(tableNameOne), Bytes.toBytes("b"), 3); + assertRawCellCount(conn, TableName.valueOf(tableNameOne), Bytes.toBytes("a"), 7); + assertRawCellCount(conn, TableName.valueOf(tableNameOne), Bytes.toBytes("b"), 7); assertRawCellCount(conn, TableName.valueOf(tableNameTwo), Bytes.toBytes("a"), 11); assertRawCellCount(conn, TableName.valueOf(tableNameTwo), Bytes.toBytes("b"), 11); } else { - assertRawCellCount(conn, TableName.valueOf(tableNameOne), Bytes.toBytes("a"), 1); - assertRawCellCount(conn, TableName.valueOf(tableNameOne), Bytes.toBytes("b"), 1); + assertRawCellCount(conn, TableName.valueOf(tableNameOne), Bytes.toBytes("a"), 5); + assertRawCellCount(conn, TableName.valueOf(tableNameOne), Bytes.toBytes("b"), 5); assertRawCellCount(conn, TableName.valueOf(tableNameTwo), Bytes.toBytes("a"), 9); assertRawCellCount(conn, TableName.valueOf(tableNameTwo), Bytes.toBytes("b"), 9); } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java index 3b6d1277e8..591016b37d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java @@ -53,6 +53,8 @@ import java.util.Map; import java.util.Properties; import java.util.Random; +import static org.junit.Assert.assertTrue; + @Category(NeedsOwnMiniClusterTest.class) @RunWith(Parameterized.class) public class TableTTLIT extends BaseTest { @@ -158,34 +160,46 @@ public class TableTTLIT extends BaseTest { final int maxLookbackAge = tableLevelMaxLooback != null ? tableLevelMaxLooback : MAX_LOOKBACK_AGE; final int maxDeleteCounter = maxLookbackAge == 0 ? 1 : maxLookbackAge; final int maxCompactionCounter = ttl / 2; + final int maxFlushCounter = ttl; final int maxMaskingCounter = 2 * ttl; + final int maxVerificationCounter = 2 * ttl; final byte[] rowKey = Bytes.toBytes("a"); try (Connection conn = DriverManager.getConnection(getUrl())) { String tableName = generateUniqueName(); createTable(tableName); + conn.createStatement().execute("Alter Table " + tableName + " set \"phoenix.max.lookback.age.seconds\" = " + maxLookbackAge); + conn.commit(); String noCompactTableName = generateUniqueName(); createTable(noCompactTableName); + conn.createStatement().execute("Alter Table " + noCompactTableName + " set \"phoenix.table.ttl.enabled\" = false"); + conn.commit(); long startTime = System.currentTimeMillis() + 1000; startTime = (startTime / 1000) * 1000; EnvironmentEdgeManager.injectEdge(injectEdge); injectEdge.setValue(startTime); int deleteCounter = RAND.nextInt(maxDeleteCounter) + 1; int compactionCounter = RAND.nextInt(maxCompactionCounter) + 1; + int flushCounter = RAND.nextInt(maxFlushCounter) + 1; int maskingCounter = RAND.nextInt(maxMaskingCounter) + 1; - boolean afterCompaction = false; + int verificationCounter = RAND.nextInt(maxVerificationCounter) + 1; for (int i = 0; i < 500; i++) { + if (flushCounter-- == 0) { + injectEdge.incrementValue(1000); + LOG.info("Flush " + i + " current time: " + injectEdge.currentTime()); + flush(TableName.valueOf(tableName)); + flushCounter = RAND.nextInt(maxFlushCounter) + 1; + } if (compactionCounter-- == 0) { injectEdge.incrementValue(1000); - LOG.debug("Compaction " + i + " current time: " + injectEdge.currentTime()); + LOG.info("Compaction " + i + " current time: " + injectEdge.currentTime()); flush(TableName.valueOf(tableName)); majorCompact(TableName.valueOf(tableName)); compactionCounter = RAND.nextInt(maxCompactionCounter) + 1; - afterCompaction = true; } if (maskingCounter-- == 0) { updateRow(conn, tableName, noCompactTableName, "a"); injectEdge.incrementValue((ttl + maxLookbackAge + 1) * 1000); - LOG.debug("Masking " + i + " current time: " + injectEdge.currentTime()); + LOG.info("Masking " + i + " current time: " + injectEdge.currentTime()); ResultSet rs = conn.createStatement().executeQuery( "SELECT count(*) FROM " + tableName); Assert.assertTrue(rs.next()); @@ -200,20 +214,18 @@ public class TableTTLIT extends BaseTest { maskingCounter = RAND.nextInt(maxMaskingCounter) + 1; } if (deleteCounter-- == 0) { - LOG.debug("Delete " + i + " current time: " + injectEdge.currentTime()); + LOG.info("Delete " + i + " current time: " + injectEdge.currentTime()); deleteRow(conn, tableName, "a"); deleteRow(conn, noCompactTableName, "a"); deleteCounter = RAND.nextInt(maxDeleteCounter) + 1; injectEdge.incrementValue(1000); } + injectEdge.incrementValue(1000); updateRow(conn, tableName, noCompactTableName, "a"); - if (!afterCompaction) { - injectEdge.incrementValue(1000); - // We are interested in the correctness of compaction and masking. Thus, we - // only need to do the latest version and scn queries to after compaction. + if (verificationCounter-- > 0) { continue; } - afterCompaction = false; + verificationCounter = RAND.nextInt(maxVerificationCounter) + 1; compareRow(conn, tableName, noCompactTableName, "a", MAX_COLUMN_INDEX); long scn = injectEdge.currentTime() - maxLookbackAge * 1000; long scnEnd = injectEdge.currentTime(); @@ -227,7 +239,6 @@ public class TableTTLIT extends BaseTest { MAX_COLUMN_INDEX); } } - injectEdge.incrementValue(1000); } } } @@ -257,16 +268,16 @@ public class TableTTLIT extends BaseTest { // At every flush, extra cell versions should be removed. // MAX_COLUMN_INDEX table columns and one empty column will be retained for // each row version. - TestUtil.assertRawCellCount(conn, TableName.valueOf(tableName), row, - (i + 1) * (MAX_COLUMN_INDEX + 1) * versions); + assertTrue(TestUtil.getRawCellCount(conn, TableName.valueOf(tableName), row) + <= (i + 1) * (MAX_COLUMN_INDEX + 1) * versions); } // Run one minor compaction (in case no minor compaction has happened yet) Admin admin = utility.getAdmin(); admin.compact(TableName.valueOf(tableName)); int waitCount = 0; while (TestUtil.getRawCellCount(conn, TableName.valueOf(tableName), - Bytes.toBytes("a")) < flushCount * (MAX_COLUMN_INDEX + 1) * versions) { - // Wait for major compactions to happen + Bytes.toBytes("a")) >= flushCount * (MAX_COLUMN_INDEX + 1) * versions) { + // Wait for minor compactions to happen Thread.sleep(1000); waitCount++; if (waitCount > 30) { diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index c15bd407c9..649c48844d 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -977,7 +977,6 @@ public class TestUtil { public static CellCount getCellCount(Table table, boolean isRaw) throws IOException { Scan s = new Scan(); s.setRaw(isRaw); - ; s.readAllVersions(); CellCount cellCount = new CellCount();