This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new d77f473100 PHOENIX-7314 Enable CompactionScanner for flushes and minor 
compaction (#1896)
d77f473100 is described below

commit d77f4731005c5a8ffb593c7c0affcad9903bb4ce
Author: Kadir Ozdemir <37155482+kadiro...@users.noreply.github.com>
AuthorDate: Fri Jun 7 23:00:24 2024 +0300

    PHOENIX-7314 Enable CompactionScanner for flushes and minor compaction 
(#1896)
---
 .../coprocessor/BaseScannerRegionObserver.java     |  48 +--
 .../phoenix/coprocessor/CompactionScanner.java     | 462 ++++++++++++++-------
 .../UngroupedAggregateRegionObserver.java          | 219 ++++++----
 .../phoenix/end2end/MaxLookbackExtendedIT.java     |  44 +-
 .../org/apache/phoenix/end2end/TableTTLIT.java     |  41 +-
 .../java/org/apache/phoenix/util/TestUtil.java     |   1 -
 6 files changed, 516 insertions(+), 299 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index d27a187dd1..b25acfb72c 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -357,21 +357,12 @@ abstract public class BaseScannerRegionObserver 
implements RegionObserver {
                 dataRegion, indexMaintainer, null, viewConstants, null, null, 
projector, ptr, useQualiferAsListIndex);
     }
 
-    public void setScanOptionsForFlushesAndCompactions(Store store, 
ScanOptions options,
-            boolean retainAllVersions) {
+    public void setScanOptionsForFlushesAndCompactions(ScanOptions options) {
         // We want the store to give us all the deleted cells to 
StoreCompactionScanner
         options.setKeepDeletedCells(KeepDeletedCells.TTL);
         options.setTTL(HConstants.FOREVER);
-        if (retainAllVersions) {
-            options.setMaxVersions(Integer.MAX_VALUE);
-            options.setMinVersions(Integer.MAX_VALUE);
-        } else {
-            options.setMinVersions(Math.max(Math.max(options.getMaxVersions(),
-                    store.getColumnFamilyDescriptor().getMaxVersions()), 1));
-            options.setMinVersions(Math.max(Math.max(options.getMinVersions(),
-                    store.getColumnFamilyDescriptor().getMaxVersions()), 1));
-        }
-
+        options.setMaxVersions(Integer.MAX_VALUE);
+        options.setMinVersions(Integer.MAX_VALUE);
     }
 
     @Override
@@ -380,16 +371,13 @@ abstract public class BaseScannerRegionObserver 
implements RegionObserver {
             CompactionRequest request) throws IOException {
         Configuration conf = c.getEnvironment().getConfiguration();
         if (isPhoenixTableTTLEnabled(conf)) {
-            boolean retainAllVersions =  isMaxLookbackTimeEnabled(
-                    
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf))
-                    || request.isMajor();
-            setScanOptionsForFlushesAndCompactions(store, options, 
retainAllVersions);
+            setScanOptionsForFlushesAndCompactions(options);
             return;
         }
-        long maxLookbackAge = getMaxLookbackAge(c);
-        if (isMaxLookbackTimeEnabled(maxLookbackAge)) {
+        long maxLookbackAgeInMillis = getMaxLookbackAge(c);
+        if (isMaxLookbackTimeEnabled(maxLookbackAgeInMillis)) {
             
setScanOptionsForFlushesAndCompactionsWhenPhoenixTTLIsDisabled(conf, options, 
store,
-                    scanType, maxLookbackAge);
+                    scanType, maxLookbackAgeInMillis);
         }
     }
 
@@ -399,16 +387,14 @@ abstract public class BaseScannerRegionObserver 
implements RegionObserver {
         Configuration conf = c.getEnvironment().getConfiguration();
 
         if (isPhoenixTableTTLEnabled(conf)) {
-            boolean retainAllVersions =  isMaxLookbackTimeEnabled(
-                    
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf));
-            setScanOptionsForFlushesAndCompactions(store, options, 
retainAllVersions);
+            setScanOptionsForFlushesAndCompactions(options);
             return;
         }
 
-        long maxLookbackAge = getMaxLookbackAge(c);
-        if (isMaxLookbackTimeEnabled(maxLookbackAge)) {
+        long maxLookbackAgeInMillis = getMaxLookbackAge(c);
+        if (isMaxLookbackTimeEnabled(maxLookbackAgeInMillis)) {
             
setScanOptionsForFlushesAndCompactionsWhenPhoenixTTLIsDisabled(conf, options, 
store,
-                    ScanType.COMPACT_RETAIN_DELETES, maxLookbackAge);
+                    ScanType.COMPACT_RETAIN_DELETES, maxLookbackAgeInMillis);
         }
     }
 
@@ -418,13 +404,11 @@ abstract public class BaseScannerRegionObserver 
implements RegionObserver {
             throws IOException {
         Configuration conf = c.getEnvironment().getConfiguration();
         if (isPhoenixTableTTLEnabled(conf)) {
-            boolean retainAllVersions =  isMaxLookbackTimeEnabled(
-                    
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(conf));
-            setScanOptionsForFlushesAndCompactions(store, options, 
retainAllVersions);
+            setScanOptionsForFlushesAndCompactions(options);
             return;
         }
-        long maxLookbackAge = getMaxLookbackAge(c);
-        if (isMaxLookbackTimeEnabled(maxLookbackAge)) {
+        long maxLookbackAgeInMillis = getMaxLookbackAge(c);
+        if (isMaxLookbackTimeEnabled(maxLookbackAgeInMillis)) {
             MemoryCompactionPolicy inMemPolicy =
                     store.getColumnFamilyDescriptor().getInMemoryCompaction();
             ScanType scanType;
@@ -437,7 +421,7 @@ abstract public class BaseScannerRegionObserver implements 
RegionObserver {
                 scanType = ScanType.COMPACT_RETAIN_DELETES;
             }
             
setScanOptionsForFlushesAndCompactionsWhenPhoenixTTLIsDisabled(conf, options, 
store,
-                    scanType, maxLookbackAge);
+                    scanType, maxLookbackAgeInMillis);
         }
     }
 
@@ -447,7 +431,7 @@ abstract public class BaseScannerRegionObserver implements 
RegionObserver {
 
         Configuration conf = c.getEnvironment().getConfiguration();
         if (isPhoenixTableTTLEnabled(conf)) {
-            setScanOptionsForFlushesAndCompactions(store, options, true);
+            setScanOptionsForFlushesAndCompactions(options);
             return;
         }
         if (!storeFileScanDoesntNeedAlteration(options)) {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index ebe92b8741..2fcf91dc64 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -50,9 +51,25 @@ import org.slf4j.LoggerFactory;
 import static 
org.apache.phoenix.query.QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX;
 
 /**
- * The store scanner that implements Phoenix TTL and Max Lookback. Phoenix 
overrides the
- * HBase implementation of data retention policies which is built at the cell 
level, and implements
- * its row level data retention within this store scanner.
+ * The store scanner that implements compaction for Phoenix. Phoenix coproc 
overrides the scan
+ * options so that HBase store scanner retains all cells during compaction and 
flushes. Then this
+ * store scanner decides which cells to retain. This is required to ensure 
rows do not expire
+ * partially and to preserve all cells within Phoenix max lookback window.
+ *
+ * The compaction process is optimized for Phoenix. This optimization assumes 
that
+ * . A given delete family or delete family version marker is inserted to all 
column families
+ * . A given delete family version marker always delete a full version of a 
row. Please note
+ *   delete family version markers are used only on index tables where 
mutations are always
+ *   full row mutations.
+ *
+ *  During major compaction, minor compaction and memstore flush, all cells 
(and delete markers)
+ *  that are visible through the max lookback window are retained. Outside the 
max lookback window,
+ *  (1) extra put cell versions, (2) delete markers and deleted cells that are 
not supposed to be
+ *  kept (by the KeepDeletedCell option), and (3) expired cells are removed 
during major compaction.
+ *  During flushes and minor compaction, expired cells and delete markers are 
not removed however
+ *  deleted cells that are not supposed to be kept (by the KeepDeletedCell 
option) and extra put
+ *  cell versions are removed.
+ *
  */
 public class CompactionScanner implements InternalScanner {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CompactionScanner.class);
@@ -64,7 +81,7 @@ public class CompactionScanner implements InternalScanner {
     private final RegionCoprocessorEnvironment env;
     private long maxLookbackWindowStart;
     private long ttlWindowStart;
-    private long ttl;
+    private long ttlInMillis;
     private final long maxLookbackInMillis;
     private int minVersion;
     private int maxVersion;
@@ -81,13 +98,19 @@ public class CompactionScanner implements InternalScanner {
     private static Map<String, Long> maxLookbackMap = new 
ConcurrentHashMap<>();
     private PhoenixLevelRowCompactor phoenixLevelRowCompactor;
     private HBaseLevelRowCompactor hBaseLevelRowCompactor;
+    private boolean major;
+    private long inputCellCount = 0;
+    private long outputCellCount = 0;
+    private boolean phoenixLevelOnly = false;
 
     public CompactionScanner(RegionCoprocessorEnvironment env,
             Store store,
             InternalScanner storeScanner,
             long maxLookbackInMillis,
             byte[] emptyCF,
-            byte[] emptyCQ) {
+            byte[] emptyCQ,
+            boolean major,
+            boolean keepDeleted) {
         this.storeScanner = storeScanner;
         this.region = env.getRegion();
         this.store = store;
@@ -99,8 +122,7 @@ public class CompactionScanner implements InternalScanner {
         columnFamilyName = store.getColumnFamilyName();
         storeColumnFamily = columnFamilyName.getBytes();
         tableName = region.getRegionInfo().getTable().getNameAsString();
-        Long overriddenMaxLookback =
-                maxLookbackMap.remove(tableName + SEPARATOR + 
columnFamilyName);
+        Long overriddenMaxLookback = maxLookbackMap.get(tableName + SEPARATOR 
+ columnFamilyName);
         this.maxLookbackInMillis = overriddenMaxLookback == null ?
                 maxLookbackInMillis : Math.max(maxLookbackInMillis, 
overriddenMaxLookback);
         // The oldest scn is current time - maxLookbackInMillis. Phoenix sets 
the scan time range
@@ -109,22 +131,25 @@ public class CompactionScanner implements InternalScanner 
{
         this.maxLookbackWindowStart = this.maxLookbackInMillis == 0 ?
                 compactionTime : compactionTime - (this.maxLookbackInMillis + 
1);
         ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
-        ttl = cfd.getTimeToLive();
-        this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime - 
ttl * 1000;
-        ttl *= 1000;
+        this.major = major;
+        int ttl = major ? cfd.getTimeToLive() : HConstants.FOREVER;
+        ttlInMillis = ((long) ttl) * 1000;
+        this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime - 
ttlInMillis;
         this.maxLookbackWindowStart = Math.max(ttlWindowStart, 
maxLookbackWindowStart);
         this.minVersion = cfd.getMinVersions();
         this.maxVersion = cfd.getMaxVersions();
-        this.keepDeletedCells = cfd.getKeepDeletedCells();
+        this.keepDeletedCells = keepDeleted ? KeepDeletedCells.TTL : 
cfd.getKeepDeletedCells();
         familyCount = region.getTableDescriptor().getColumnFamilies().length;
         localIndex = 
columnFamilyName.startsWith(LOCAL_INDEX_COLUMN_FAMILY_PREFIX);
-        emptyCFStore = familyCount == 1 || 
columnFamilyName.equals(Bytes.toString(emptyCF))
-                        || localIndex;
+        emptyCFStore = major
+                ? familyCount == 1 || 
columnFamilyName.equals(Bytes.toString(emptyCF))
+                        || localIndex
+                : true; // we do not need to identify emptyCFStore for minor 
compaction or flushes
         phoenixLevelRowCompactor = new PhoenixLevelRowCompactor();
         hBaseLevelRowCompactor = new HBaseLevelRowCompactor();
-        LOGGER.info("Starting Phoenix CompactionScanner for table " + 
tableName + " store "
-                + columnFamilyName + " ttl " + ttl + "ms " + "max lookback "
-                + maxLookbackInMillis + "ms");
+        LOGGER.info("Starting CompactionScanner for table " + tableName + " 
store "
+                + columnFamilyName + (major ? " major " : " not major ") + 
"compaction ttl "
+                + ttlInMillis + "ms " + "max lookback " + 
this.maxLookbackInMillis + "ms");
     }
 
     /**
@@ -138,16 +163,71 @@ public class CompactionScanner implements InternalScanner 
{
         }
         Long old = maxLookbackMap.putIfAbsent(tableName + SEPARATOR + 
columnFamilyName,
                 maxLookbackInMillis);
-        if (old != null && old < maxLookbackInMillis) {
+        if (old != null) {
             maxLookbackMap.put(tableName + SEPARATOR + columnFamilyName, 
maxLookbackInMillis);
         }
     }
 
+    public static long getMaxLookbackInMillis(String tableName, String 
columnFamilyName,
+            long maxLookbackInMillis) {
+        if (tableName == null || columnFamilyName == null) {
+            return maxLookbackInMillis;
+        }
+        Long value = maxLookbackMap.get(tableName + 
CompactionScanner.SEPARATOR + columnFamilyName);
+        return value == null
+                ? maxLookbackInMillis
+                : maxLookbackMap.get(tableName + CompactionScanner.SEPARATOR + 
columnFamilyName);
+    }
+    static class CellTimeComparator implements Comparator<Cell> {
+        public static final CellTimeComparator COMPARATOR = new 
CellTimeComparator();
+        @Override public int compare(Cell o1, Cell o2) {
+            long ts1 = o1.getTimestamp();
+            long ts2 = o2.getTimestamp();
+            if (ts1 == ts2) return 0;
+            if (ts1 > ts2) return -1;
+            return 1;
+        }
+
+        @Override public boolean equals(Object obj) {
+            return false;
+        }
+    }
+    private void printRow(List<Cell> result, String title, boolean sort) {
+        List<Cell> row;
+        if (sort) {
+            row = new ArrayList<>(result);
+            Collections.sort(row, CellTimeComparator.COMPARATOR);
+        } else {
+            row = result;
+        }
+        System.out.println("---- " + title + " ----");
+        System.out.println((major ? "Major " : "Not major ")
+                + "compaction time: " + compactionTime);
+        System.out.println("Max lookback window start time: " + 
maxLookbackWindowStart);
+        System.out.println("Max lookback in ms: " + maxLookbackInMillis);
+        System.out.println("TTL in ms: " + ttlInMillis);
+        boolean maxLookbackLine = false;
+        boolean ttlLine = false;
+        for (Cell cell : row) {
+            if (!maxLookbackLine && cell.getTimestamp() < 
maxLookbackWindowStart) {
+                System.out.println("-----> Max lookback window start time: " + 
maxLookbackWindowStart);
+                maxLookbackLine = true;
+            } else if (!ttlLine && cell.getTimestamp() < ttlWindowStart) {
+                System.out.println("-----> TTL window start time: " + 
ttlWindowStart);
+                ttlLine = true;
+            }
+            System.out.println(cell);
+        }
+    }
     @Override
     public boolean next(List<Cell> result) throws IOException {
         boolean hasMore = storeScanner.next(result);
+        inputCellCount += result.size();
         if (!result.isEmpty()) {
+            // printRow(result, "Input for " + tableName + " " + 
columnFamilyName, true); // This is for debugging
             phoenixLevelRowCompactor.compact(result, false);
+            outputCellCount += result.size();
+            // printRow(result, "Output for " + tableName + " " + 
columnFamilyName, true); // This is for debugging
         }
         return hasMore;
     }
@@ -159,8 +239,10 @@ public class CompactionScanner implements InternalScanner {
 
     @Override
     public void close() throws IOException {
-        LOGGER.info("Closing Phoenix CompactionScanner for table " + tableName 
+ " store "
-                + columnFamilyName);
+        LOGGER.info("Closing CompactionScanner for table " + tableName + " 
store "
+                + columnFamilyName + (major ? " major " : " not major ") + 
"compaction retained "
+                + outputCellCount + " of " + inputCellCount + " cells"
+                + (phoenixLevelOnly ? " phoenix level only" : ""));
         storeScanner.close();
     }
 
@@ -171,13 +253,19 @@ public class CompactionScanner implements InternalScanner 
{
     static class RowContext {
         Cell familyDeleteMarker = null;
         Cell familyVersionDeleteMarker = null;
-        List<Cell> columnDeleteMarkers = null;
+        List<Cell> columnDeleteMarkers = new ArrayList<>();
         int version = 0;
         long maxTimestamp;
         long minTimestamp;
+
+        private void init() {
+            familyDeleteMarker = null;
+            familyVersionDeleteMarker = null;
+            columnDeleteMarkers.clear();
+            version = 0;
+        }
         private void addColumnDeleteMarker(Cell deleteMarker) {
-            if (columnDeleteMarkers == null) {
-                columnDeleteMarkers = new ArrayList<>();
+            if (columnDeleteMarkers.isEmpty()) {
                 columnDeleteMarkers.add(deleteMarker);
                 return;
             }
@@ -200,6 +288,8 @@ public class CompactionScanner implements InternalScanner {
                 // Set it to null so it will be used once
                 familyVersionDeleteMarker = null;
             } else {
+                // The same delete family marker may be retained multiple 
times. Duplicates will be
+                // removed later
                 retainedCells.add(familyDeleteMarker);
             }
         }
@@ -217,9 +307,13 @@ public class CompactionScanner implements InternalScanner {
                 if ((CellUtil.matchingFamily(cell, dm)) &&
                         CellUtil.matchingQualifier(cell, dm)) {
                     if (dm.getType() == Cell.Type.Delete) {
-                        // Delete is for deleting a specific cell version. 
Thus, it can be used
-                        // to delete only one cell.
-                        columnDeleteMarkers.remove(i);
+                        if (cell.getTimestamp() == dm.getTimestamp()) {
+                            // Delete is for deleting a specific cell version. 
Thus, it can be used
+                            // to delete only one cell.
+                            columnDeleteMarkers.remove(i);
+                        } else {
+                            continue;
+                        }
                     }
                     if (maxTimestamp >= ttlWindowStart) {
                         // Inside the TTL window
@@ -253,6 +347,9 @@ public class CompactionScanner implements InternalScanner {
             Cell firstCell;
             LinkedList<Cell> deleteColumn = null;
             long ts;
+            // The next row version is formed by the first cell of each 
column. Similarly, the min
+            // max timestamp of the cells of a row version is determined by 
looking at just first
+            // cell of the columns
             for (LinkedList<Cell> column : columns) {
                 firstCell = column.getFirst();
                 ts = firstCell.getTimestamp();
@@ -269,7 +366,7 @@ public class CompactionScanner implements InternalScanner {
                 }
             }
             if (deleteColumn != null) {
-                // A row version do not cross a family delete marker. This 
means
+                // A row version cannot cross a family delete marker by 
definition. This means
                 // min timestamp cannot be lower than the delete markers 
timestamp
                 for (Cell cell : deleteColumn) {
                     ts = cell.getTimestamp();
@@ -280,6 +377,41 @@ public class CompactionScanner implements InternalScanner {
                 }
             }
         }
+
+        /**
+         * This is used for Phoenix level compaction
+         */
+        private void getNextRowVersionTimestamps(List<Cell> row, byte[] 
columnFamily) {
+            maxTimestamp = 0;
+            minTimestamp = Long.MAX_VALUE;
+            Cell deleteFamily = null;
+            long ts;
+            // The next row version is formed by the first cell of each 
column. Similarly, the min
+            // max timestamp of the cells of a row version is determined by 
looking at just first
+            // cell of the columns
+            for (Cell cell : row) {
+                ts = cell.getTimestamp();
+                if ((cell.getType() == Cell.Type.DeleteFamily ||
+                        cell.getType() == Cell.Type.DeleteFamilyVersion) &&
+                        CellUtil.matchingFamily(cell, columnFamily)) {
+                    deleteFamily = cell;
+                }
+                if (maxTimestamp < ts) {
+                    maxTimestamp = ts;
+                }
+                if (minTimestamp > ts) {
+                    minTimestamp = ts;
+                }
+            }
+            if (deleteFamily != null) {
+                // A row version cannot cross a family delete marker by 
definition. This means
+                // min timestamp cannot be lower than the delete markers 
timestamp
+                ts = deleteFamily.getTimestamp();
+                if (ts < maxTimestamp) {
+                    minTimestamp = ts + 1;
+                }
+            }
+        }
     }
 
     /**
@@ -288,6 +420,8 @@ public class CompactionScanner implements InternalScanner {
      *
      */
     class HBaseLevelRowCompactor {
+        private RowContext rowContext = new RowContext();
+        private CompactionRowVersion rowVersion = new CompactionRowVersion();
         /**
          * A compaction row version includes the latest put cell versions from 
each column such that
          * the cell versions do not cross delete family markers. In other 
words, the compaction row
@@ -315,6 +449,10 @@ public class CompactionScanner implements InternalScanner {
             // The version of a row version. It is the minimum of the versions 
of the cells included
             // in the row version
             int version = 0;
+
+            private void init() {
+                cells.clear();
+            }
             @Override
             public String toString() {
                 StringBuilder output = new StringBuilder();
@@ -331,14 +469,13 @@ public class CompactionScanner implements InternalScanner 
{
          * Decide if compaction row versions inside the TTL window should be 
retained. The
          * versions are retained if one of the following conditions holds
          * 1. The compaction row version is alive and its version is less than 
VERSIONS
-         * 2. The compaction row version is deleted and KeepDeletedCells is TTL
-         * 3. The compaction row version is deleted, its version is less than 
MIN_VERSIONS and
-         * KeepDeletedCells is TRUE
+         * 2. The compaction row version is deleted and KeepDeletedCells is 
not FALSE
          *
          */
         private void retainInsideTTLWindow(CompactionRowVersion rowVersion, 
RowContext rowContext,
                 List<Cell> retainedCells) {
-            if (rowContext.familyDeleteMarker == null && 
rowContext.familyVersionDeleteMarker == null) {
+            if (rowContext.familyDeleteMarker == null
+                    && rowContext.familyVersionDeleteMarker == null) {
                 // The compaction row version is alive
                 if (rowVersion.version < maxVersion) {
                     // Rule 1
@@ -346,9 +483,8 @@ public class CompactionScanner implements InternalScanner {
                 }
             } else {
                 // Deleted
-                if ((rowVersion.version < maxVersion && keepDeletedCells == 
KeepDeletedCells.TRUE) ||
-                        keepDeletedCells == KeepDeletedCells.TTL) {
-                    // Retain based on rule 2 or 3
+                if (rowVersion.version < maxVersion && keepDeletedCells != 
KeepDeletedCells.FALSE) {
+                    // Retain based on rule 2
                     retainCells(rowVersion, rowContext, retainedCells);
                     rowContext.retainFamilyDeleteMarker(retainedCells);
                 }
@@ -374,12 +510,9 @@ public class CompactionScanner implements InternalScanner {
                 }
             } else {
                 // Deleted compaction row version
-                if (keepDeletedCells == KeepDeletedCells.TTL && (
-                        (rowContext.familyVersionDeleteMarker != null &&
-                                
rowContext.familyVersionDeleteMarker.getTimestamp() > ttlWindowStart) ||
-                                (rowContext.familyDeleteMarker != null &&
-                                        
rowContext.familyDeleteMarker.getTimestamp() > ttlWindowStart)
-                )) {
+                if (keepDeletedCells == KeepDeletedCells.TTL
+                        && rowContext.familyDeleteMarker != null
+                        && rowContext.familyDeleteMarker.getTimestamp() > 
ttlWindowStart) {
                     // Rule 2
                     retainCells(rowVersion, rowContext, retainedCells);
                     rowContext.retainFamilyDeleteMarker(retainedCells);
@@ -406,10 +539,9 @@ public class CompactionScanner implements InternalScanner {
          */
         private void formNextCompactionRowVersion(LinkedList<LinkedList<Cell>> 
columns,
                 RowContext rowContext, List<Cell> retainedCells) {
-            CompactionRowVersion rowVersion = new CompactionRowVersion();
+            rowVersion.init();
             rowContext.getNextRowVersionTimestamps(columns, storeColumnFamily);
             rowVersion.ts = rowContext.maxTimestamp;
-            rowVersion.version = rowContext.version++;
             for (LinkedList<Cell> column : columns) {
                 Cell cell = column.getFirst();
                 if (column.getFirst().getTimestamp() < 
rowContext.minTimestamp) {
@@ -419,13 +551,15 @@ public class CompactionScanner implements InternalScanner 
{
                     if (cell.getTimestamp() >= rowContext.maxTimestamp) {
                         rowContext.familyDeleteMarker = cell;
                         column.removeFirst();
+                        break;
                     }
                     continue;
                 }
                 else if (cell.getType() == Cell.Type.DeleteFamilyVersion) {
-                    if (cell.getTimestamp() >= rowContext.maxTimestamp) {
+                    if (cell.getTimestamp() == rowVersion.ts) {
                         rowContext.familyVersionDeleteMarker = cell;
                         column.removeFirst();
+                        break;
                     }
                     continue;
                 }
@@ -440,6 +574,7 @@ public class CompactionScanner implements InternalScanner {
             if (rowVersion.cells.isEmpty()) {
                 return;
             }
+            rowVersion.version = rowContext.version++;
             if (rowVersion.ts >= ttlWindowStart) {
                 retainInsideTTLWindow(rowVersion, rowContext, retainedCells);
             } else {
@@ -449,7 +584,7 @@ public class CompactionScanner implements InternalScanner {
 
         private void formCompactionRowVersions(LinkedList<LinkedList<Cell>> 
columns,
                 List<Cell> result) {
-            RowContext rowContext = new RowContext();
+            rowContext.init();
             while (!columns.isEmpty()) {
                 formNextCompactionRowVersion(columns, rowContext, result);
                 // Remove the columns that are empty
@@ -468,14 +603,10 @@ public class CompactionScanner implements InternalScanner 
{
          * the pair of family name and column qualifier. While doing that also 
add the delete
          * markers to a separate list.
          */
-        private void formColumns(List<Cell> result, 
LinkedList<LinkedList<Cell>> columns,
-                List<Cell> deleteMarkers) {
+        private void formColumns(List<Cell> result, 
LinkedList<LinkedList<Cell>> columns) {
             Cell currentColumnCell = null;
             LinkedList<Cell> currentColumn = null;
             for (Cell cell : result) {
-                if (cell.getType() != Cell.Type.Put) {
-                    deleteMarkers.add(cell);
-                }
                 if (currentColumnCell == null) {
                     currentColumn = new LinkedList<>();
                     currentColumnCell = cell;
@@ -503,8 +634,7 @@ public class CompactionScanner implements InternalScanner {
                 return;
             }
             LinkedList<LinkedList<Cell>> columns = new LinkedList<>();
-            List<Cell> deleteMarkers = new ArrayList<>();
-            formColumns(result, columns, deleteMarkers);
+            formColumns(result, columns);
             result.clear();
             formCompactionRowVersions(columns, result);
         }
@@ -520,6 +650,12 @@ public class CompactionScanner implements InternalScanner {
      *
      */
     class PhoenixLevelRowCompactor {
+        private RowContext rowContext = new RowContext();
+        List<Cell> lastRowVersion = new ArrayList<>();
+        List<Cell> emptyColumn = new ArrayList<>();
+        List<Cell> phoenixResult = new ArrayList<>();
+        List<Cell> trimmedRow = new ArrayList<>();
+        List<Cell> trimmedEmptyColumn = new ArrayList<>();
 
         /**
          * The cells of the row (i.e., result) read from HBase store are 
lexicographically ordered
@@ -529,33 +665,53 @@ public class CompactionScanner implements InternalScanner 
{
          * based on the pair of family name and column qualifier.
          *
          * The cells within the max lookback window except the once at the 
lower edge of the
-         * max lookback window are retained immediately and not included in 
the constructed columns.
+         * max lookback window (the last row of the max lookback window) are 
retained immediately.
+         *
+         * This method also returned the remaining cells (outside the max 
lookback window) of
+         * the empty colum
          */
-        private void getColumns(List<Cell> result, 
LinkedList<LinkedList<Cell>> columns,
-                List<Cell> retainedCells) {
+        private void getLastRowVersionInMaxLookbackWindow(List<Cell> result,
+                List<Cell> lastRowVersion, List<Cell> retainedCells, 
List<Cell> emptyColumn) {
             Cell currentColumnCell = null;
-            LinkedList<Cell> currentColumn = null;
+            boolean isEmptyColumn = false;
+            Cell cellAtMaxLookbackWindowStart = null;
             for (Cell cell : result) {
-                if (cell.getTimestamp() > maxLookbackWindowStart) {
+                if (cell.getTimestamp() >= maxLookbackWindowStart) {
                     retainedCells.add(cell);
+                    if (cell.getTimestamp() == maxLookbackWindowStart) {
+                        cellAtMaxLookbackWindowStart = cell;
+                    }
                     continue;
                 }
-                if (currentColumnCell == null) {
-                    currentColumn = new LinkedList<>();
-                    currentColumnCell = cell;
-                    currentColumn.add(cell);
-                } else if (!CellUtil.matchingColumn(cell, currentColumnCell)) {
-                    columns.add(currentColumn);
-                    currentColumn = new LinkedList<>();
+                if (!major && cell.getType() != Cell.Type.Put) {
+                    retainedCells.add(cell);
+                }
+                if (currentColumnCell == null ||
+                        !CellUtil.matchingColumn(cell, currentColumnCell)) {
                     currentColumnCell = cell;
-                    currentColumn.add(cell);
-                } else {
-                    currentColumn.add(cell);
+                    if (cell.getType() != Cell.Type.Delete
+                            && cell.getType() != Cell.Type.DeleteColumn) {
+                        // Include only delete family markers and put cells
+                        // It is possible that this cell is not visible from 
the max lookback
+                        // window. This happens when there is a mutation with 
the mutation timestamp
+                        // equal to the max lookback window start timestamp. 
The following is to
+                        // check for this case
+                        if (cellAtMaxLookbackWindowStart == null
+                                || !CellUtil.matchingColumn(cell, 
cellAtMaxLookbackWindowStart)) {
+                            lastRowVersion.add(cell);
+                        }
+                    }
+                    if (major && ScanUtil.isEmptyColumn(cell, emptyCF, 
emptyCQ)) {
+                        isEmptyColumn = true;
+                    } else {
+                        isEmptyColumn = false;
+                    }
+                } else if (major && isEmptyColumn) {
+                    // We only need to keep one cell for every column for the 
last row version.
+                    // So here we just form the empty column beyond the last 
row version
+                    emptyColumn.add(cell);
                 }
             }
-            if (currentColumn != null) {
-                columns.add(currentColumn);
-            }
         }
 
         /**
@@ -576,10 +732,10 @@ public class CompactionScanner implements InternalScanner 
{
                 if (previous == -1) {
                     break;
                 }
-                if (max - ts > ttl) {
+                if (max - ts > ttlInMillis) {
                     max = input.get(previous).getTimestamp();
                     output.add(input.remove(previous));
-                    if (max - min > ttl) {
+                    if (max - min > ttlInMillis) {
                         closeGap(max, min, input, output);
                     }
                     return;
@@ -591,93 +747,88 @@ public class CompactionScanner implements InternalScanner 
{
         /**
          * Retain the last row version visible through the max lookback window
          */
-        private void retainCellsOfLastRowVersion(LinkedList<LinkedList<Cell>> 
columns,
-                List<Cell> retainedCells) {
-            if (columns.isEmpty()) {
+        private void retainCellsOfLastRowVersion(List<Cell> lastRow,
+                List<Cell> emptyColumn, List<Cell> retainedCells) {
+            if (lastRow.isEmpty()) {
                 return;
             }
-            RowContext rowContext = new RowContext();
-            rowContext.getNextRowVersionTimestamps(columns, storeColumnFamily);
-            List<Cell> retainedPutCells = new ArrayList<>();
-            for (LinkedList<Cell> column : columns) {
-                Cell cell = column.getFirst();
-                if (cell.getTimestamp() < rowContext.minTimestamp) {
-                    continue;
-                }
-                if (cell.getType() == Cell.Type.Put) {
-                    retainedPutCells.add(cell);
-                } else if (cell.getType() == Cell.Type.DeleteFamily ||
-                        cell.getType() == Cell.Type.DeleteFamilyVersion) {
-                    if (cell.getTimestamp() >= rowContext.maxTimestamp) {
-                        // This means that the row version outside the max 
lookback window is
-                        // deleted and thus should not be visible to the scn 
queries
-                        if (cell.getTimestamp() == maxLookbackWindowStart) {
-                            // Include delete markers at maxLookbackWindowStart
-                            retainedCells.add(cell);
-                        }
-                        return;
-                    }
-                } else if (cell.getTimestamp() == maxLookbackWindowStart) {
-                    // Include delete markers at maxLookbackWindowStart
-                    retainedCells.add(cell);
+            rowContext.init();
+            rowContext.getNextRowVersionTimestamps(lastRow, storeColumnFamily);
+            Cell firstCell = lastRow.get(0);
+            if (firstCell.getType() == Cell.Type.DeleteFamily ||
+                    firstCell.getType() == Cell.Type.DeleteFamilyVersion) {
+                if (firstCell.getTimestamp() >= rowContext.maxTimestamp) {
+                    // This means that the row version outside the max 
lookback window is
+                    // deleted and thus should not be visible to the scn 
queries
+                    return;
                 }
             }
-            if (compactionTime - rowContext.maxTimestamp > maxLookbackInMillis 
+ ttl) {
+
+            if (compactionTime - rowContext.maxTimestamp > maxLookbackInMillis 
+ ttlInMillis) {
                 // The row version should not be visible via the max lookback 
window. Nothing to do
                 return;
             }
-            retainedCells.addAll(retainedPutCells);
+            retainedCells.addAll(lastRow);
             // If the gap between two back to back mutations is more than ttl 
then the older
             // mutation will be considered expired and masked. If the length 
of the time range of
             // a row version is not more than ttl, then we know the cells 
covered by the row
             // version are not apart from each other more than ttl and will 
not be masked.
-            if (rowContext.maxTimestamp - rowContext.minTimestamp <= ttl) {
+            if (rowContext.maxTimestamp - rowContext.minTimestamp <= 
ttlInMillis) {
                 return;
             }
             // The quick time range check did not pass. We need get at least 
one empty cell to cover
             // the gap so that the row version will not be masked by 
PhoenixTTLRegionScanner.
-            List<Cell> emptyCellColumn = null;
-            for (LinkedList<Cell> column : columns) {
-                if (ScanUtil.isEmptyColumn(column.getFirst(), emptyCF, 
emptyCQ)) {
-                    emptyCellColumn = column;
-                    break;
-                }
-            }
-            if (emptyCellColumn == null) {
+            if (emptyColumn.isEmpty()) {
                 return;
             }
-            int size = retainedPutCells.size();
+            int size = lastRow.size();
             long tsArray[] = new long[size];
             int i = 0;
-            for (Cell cell : retainedPutCells) {
+            for (Cell cell : lastRow) {
                 tsArray[i++] = cell.getTimestamp();
             }
             Arrays.sort(tsArray);
             for (i = size - 1; i > 0; i--) {
-                if (tsArray[i] - tsArray[i - 1] > ttl) {
-                    closeGap(tsArray[i], tsArray[i - 1], emptyCellColumn, 
retainedCells);
+                if (tsArray[i] - tsArray[i - 1] > ttlInMillis) {
+                    closeGap(tsArray[i], tsArray[i - 1], emptyColumn, 
retainedCells);
                 }
             }
         }
 
+        /**
+         * The retained cells includes the cells that are visible through the 
max lookback
+         * window and the additional empty column cells that are needed to 
reduce large time
+         * between the cells of the last row version.
+         */
         private boolean retainCellsForMaxLookback(List<Cell> result, boolean 
regionLevel,
                 List<Cell> retainedCells) {
-            LinkedList<LinkedList<Cell>> columns = new LinkedList<>();
-            getColumns(result, columns, retainedCells);
+
+            lastRowVersion.clear();
+            emptyColumn.clear();
+            getLastRowVersionInMaxLookbackWindow(result, lastRowVersion, 
retainedCells,
+                    emptyColumn);
+            if (lastRowVersion.isEmpty()) {
+                return true;
+            }
+            if (!major) {
+                // We do not expire cells for minor compaction and memstore 
flushes
+                retainCellsOfLastRowVersion(lastRowVersion, emptyColumn, 
retainedCells);
+                return true;
+            }
             long maxTimestamp = 0;
             long minTimestamp = Long.MAX_VALUE;
             long ts;
-            for (LinkedList<Cell> column : columns) {
-                ts = column.getFirst().getTimestamp();
+            for (Cell cell : lastRowVersion) {
+                ts =cell.getTimestamp();
                 if (ts > maxTimestamp) {
                     maxTimestamp = ts;
                 }
-                ts = column.getLast().getTimestamp();
+                ts = cell.getTimestamp();
                 if (ts < minTimestamp) {
                     minTimestamp = ts;
                 }
             }
-            if (compactionTime - maxTimestamp > maxLookbackInMillis + ttl) {
+            if (compactionTime - maxTimestamp > maxLookbackInMillis + 
ttlInMillis) {
                 if (!emptyCFStore && !regionLevel) {
                     // The row version is more than maxLookbackInMillis + ttl 
old. We cannot decide
                     // if we should retain it with the store level compaction 
when the current
@@ -688,7 +839,7 @@ public class CompactionScanner implements InternalScanner {
             }
             // If the time gap between two back to back mutations is more than 
ttl then we know
             // that the row is expired within the time gap.
-            if (maxTimestamp - minTimestamp > ttl) {
+            if (maxTimestamp - minTimestamp > ttlInMillis) {
                 if ((familyCount > 1 && !regionLevel && !localIndex)) {
                     // When there are more than one column family for a given 
table and a row
                     // version constructed at the store level covers a time 
span larger than ttl,
@@ -699,38 +850,63 @@ public class CompactionScanner implements InternalScanner 
{
                     return false;
                 }
                 // We either have one column family or are doing region level 
compaction. In both
-                // case, we can safely trim the cells beyond the first time 
gap larger ttl
-                int size = result.size();
+                // case, we can safely trim the cells beyond the first time 
gap larger ttl.
+                // Here we are interested in the gaps between the cells of the 
last row version
+                // amd thus we need to examine the gaps between these cells 
and the empty column.
+                // Please note that empty column is always updated for every 
mutation and so we
+                // just need empty column cells for the gap analysis.
+                int size = lastRowVersion.size();
+                size += emptyColumn.size();
                 long tsArray[] = new long[size];
                 int i = 0;
-                for (Cell cell : result) {
+                for (Cell cell : lastRowVersion) {
+                    tsArray[i++] = cell.getTimestamp();
+                }
+                for (Cell cell : emptyColumn) {
                     tsArray[i++] = cell.getTimestamp();
                 }
                 Arrays.sort(tsArray);
                 boolean gapFound = false;
                 // Since timestamps are sorted in ascending order, traverse 
them in reverse order
                 for (i = size - 1; i > 0; i--) {
-                    if (tsArray[i] - tsArray[i - 1] > ttl) {
+                    if (tsArray[i] - tsArray[i - 1] > ttlInMillis) {
                         minTimestamp = tsArray[i];
                         gapFound = true;
                         break;
                     }
                 }
                 if (gapFound) {
-                    List<Cell> trimmedResult = new ArrayList<>(size - i);
-                    for (Cell cell : result) {
+                    trimmedRow.clear();
+                    for (Cell cell : lastRowVersion) {
                         if (cell.getTimestamp() >= minTimestamp) {
-                            trimmedResult.add(cell);
+                            trimmedRow.add(cell);
                         }
                     }
-                    columns.clear();
-                    retainedCells.clear();
-                    getColumns(trimmedResult, columns, retainedCells);
+                    lastRowVersion = trimmedRow;
+                    trimmedEmptyColumn.clear();;
+                    for (Cell cell : lastRowVersion) {
+                        if (cell.getTimestamp() >= minTimestamp) {
+                            trimmedEmptyColumn.add(cell);
+                        }
+                    }
+                    emptyColumn = trimmedEmptyColumn;
                 }
             }
-            retainCellsOfLastRowVersion(columns, retainedCells);
+            retainCellsOfLastRowVersion(lastRowVersion, emptyColumn, 
retainedCells);
             return true;
         }
+        private void removeDuplicates(List<Cell> input, List<Cell> output) {
+            Cell previousCell = null;
+            for (Cell cell : input) {
+                if (previousCell == null ||
+                        cell.getTimestamp() != previousCell.getTimestamp() ||
+                        cell.getType() != previousCell.getType() ||
+                        !CellUtil.matchingColumn(cell, previousCell)) {
+                    output.add(cell);
+                }
+                previousCell = cell;
+            }
+        }
         /**
          * Compacts a single row at the Phoenix level. The result parameter is 
the input row and
          * modified to be the output of the compaction process.
@@ -739,7 +915,7 @@ public class CompactionScanner implements InternalScanner {
             if (result.isEmpty()) {
                 return;
             }
-            List<Cell> phoenixResult = new ArrayList<>(result.size());
+            phoenixResult.clear();
             if (!retainCellsForMaxLookback(result, regionLevel, 
phoenixResult)) {
                 if (familyCount == 1 || regionLevel) {
                     throw new RuntimeException("UNEXPECTED");
@@ -747,14 +923,17 @@ public class CompactionScanner implements InternalScanner 
{
                 phoenixResult.clear();
                 compactRegionLevel(result, phoenixResult);
             }
-            if (maxVersion == 1 &&  minVersion == 0 && keepDeletedCells == 
KeepDeletedCells.FALSE) {
-                // We need to Phoenix level compaction only
+            if (maxVersion == 1
+                    && (!major
+                        || (minVersion == 0 && keepDeletedCells == 
KeepDeletedCells.FALSE))) {
+                // We need Phoenix level compaction only
                 Collections.sort(phoenixResult, CellComparator.getInstance());
                 result.clear();
-                result.addAll(phoenixResult);
+                removeDuplicates(phoenixResult, result);
+                phoenixLevelOnly = true;
                 return;
             }
-            // We may need to do retain more cells, and so we need to run 
HBase level compaction
+            // We may need to retain more cells, and so we need to run HBase 
level compaction
             // too. The result of two compactions will be merged and duplicate 
cells are removed.
             int phoenixResultSize = phoenixResult.size();
             List<Cell> hbaseResult = new ArrayList<>(result);
@@ -762,18 +941,7 @@ public class CompactionScanner implements InternalScanner {
             phoenixResult.addAll(hbaseResult);
             Collections.sort(phoenixResult, CellComparator.getInstance());
             result.clear();
-            Cell previousCell = null;
-            // Eliminate duplicates
-            for (Cell cell : phoenixResult) {
-                if (previousCell == null ||
-                        cell.getTimestamp() != previousCell.getTimestamp() ||
-                        cell.getType() != previousCell.getType() ||
-                        !CellUtil.matchingColumn(cell, previousCell)) {
-                    result.add(cell);
-                }
-                previousCell = cell;
-            }
-
+            removeDuplicates(phoenixResult, result);
             if (result.size() > phoenixResultSize) {
                 LOGGER.debug("HBase level compaction retained " +
                         (result.size() - phoenixResultSize) + " more cells");
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index d1b9f17112..d0e8361472 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -32,9 +32,11 @@ import java.security.PrivilegedExceptionAction;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.annotation.concurrent.GuardedBy;
 
@@ -59,6 +61,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import 
org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControllerFactory;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -174,8 +177,26 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
     private Configuration compactionConfig;
     private Configuration indexWriteConfig;
     private ReadOnlyProps indexWriteProps;
-    private boolean isPhoenixTableTTLEnabled;
 
+    private static Map<String, Long> maxLookbackMap = new 
ConcurrentHashMap<>();
+
+    public static void setMaxLookbackInMillis(String tableName, long 
maxLookbackInMillis) {
+        if (tableName == null) {
+            return;
+        }
+        maxLookbackMap.put(tableName,
+                maxLookbackInMillis);
+    }
+
+    public static long getMaxLookbackInMillis(String tableName, long 
maxLookbackInMillis) {
+        if (tableName == null) {
+            return maxLookbackInMillis;
+        }
+        Long value = maxLookbackMap.get(tableName);
+        return value == null
+                ? maxLookbackInMillis
+                : maxLookbackMap.get(tableName);
+    }
     @Override
     public Optional<RegionObserver> getRegionObserver() {
         return Optional.of(this);
@@ -205,9 +226,6 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                 
e.getConfiguration().getInt(QueryServices.INDEX_REBUILD_RPC_RETRIES_COUNTER,
                         
QueryServicesOptions.DEFAULT_INDEX_REBUILD_RPC_RETRIES_COUNTER));
         indexWriteProps = new ReadOnlyProps(indexWriteConfig.iterator());
-        isPhoenixTableTTLEnabled =
-                
e.getConfiguration().getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
-                QueryServicesOptions.DEFAULT_PHOENIX_TABLE_TTL_ENABLED);
     }
 
     Configuration getUpsertSelectConfig() {
@@ -581,92 +599,128 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                 region.getTableDescriptor().getTableName().getName()) == 0);
     }
 
+    @Override
+    public InternalScanner 
preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+            InternalScanner scanner, FlushLifeCycleTracker tracker) throws 
IOException {
+        if (!isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) {
+            return scanner;
+        } else {
+            return User.runAsLoginUser(new 
PrivilegedExceptionAction<InternalScanner>() {
+                @Override public InternalScanner run() throws Exception {
+                    String tableName = 
c.getEnvironment().getRegion().getRegionInfo().getTable()
+                            .getNameAsString();
+                    long maxLookbackInMillis =
+                            
UngroupedAggregateRegionObserver.getMaxLookbackInMillis(
+                                    tableName,
+                                    
BaseScannerRegionObserverConstants.getMaxLookbackInMillis(
+                                            
c.getEnvironment().getConfiguration()));
+                    maxLookbackInMillis = 
CompactionScanner.getMaxLookbackInMillis(tableName,
+                            store.getColumnFamilyName(), maxLookbackInMillis);
+                    return new CompactionScanner(c.getEnvironment(), store, 
scanner,
+                            maxLookbackInMillis, null, null, false, true);
+                }
+            });
+        }
+    }
+
     @Override
     public InternalScanner 
preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
                                       InternalScanner scanner, ScanType 
scanType, CompactionLifeCycleTracker tracker,
                                       CompactionRequest request) throws 
IOException {
-        if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
-            final TableName tableName = 
c.getEnvironment().getRegion().getRegionInfo().getTable();
-            // Compaction and split upcalls run with the effective user 
context of the requesting user.
-            // This will lead to failure of cross cluster RPC if the effective 
user is not
-            // the login user. Switch to the login user context to ensure we 
have the expected
-            // security context.
-            return User.runAsLoginUser(new 
PrivilegedExceptionAction<InternalScanner>() {
-                @Override
-                public InternalScanner run() throws Exception {
-                    InternalScanner internalScanner = scanner;
-                    if (request.isMajor()) {
-                        boolean isDisabled = false;
-                        boolean isMultiTenantIndexTable = false;
-                        if 
(tableName.getNameAsString().startsWith(MetaDataUtil.VIEW_INDEX_TABLE_PREFIX)) {
-                            isMultiTenantIndexTable = true;
-                        }
-                        final String fullTableName = isMultiTenantIndexTable ?
-                                
SchemaUtil.getParentTableNameFromIndexTable(tableName.getNameAsString(),
-                                        MetaDataUtil.VIEW_INDEX_TABLE_PREFIX) :
-                                tableName.getNameAsString();
-                        PTable table = null;
-                        try (PhoenixConnection conn = 
QueryUtil.getConnectionOnServer(
-                                
compactionConfig).unwrap(PhoenixConnection.class)) {
-                            table = conn.getTableNoCache(fullTableName);
-                        } catch (Exception e) {
-                            if (e instanceof TableNotFoundException) {
-                                LOGGER.debug("Ignoring HBase table that is not 
a Phoenix table: "
-                                        + fullTableName);
-                                // non-Phoenix HBase tables won't be found, do 
nothing
-                            } else {
-                                LOGGER.error(
-                                        "Unable to modify compaction scanner 
to retain deleted "
-                                                + "cells for a table with 
disabled Index; "
-                                                + fullTableName, e);
-                            }
-                        }
-                        // The previous indexing design needs to retain delete 
markers and deleted
-                        // cells to rebuild disabled indexes. Thus, we skip 
major compaction for
-                        // them. GlobalIndexChecker is the coprocessor 
introduced by the current
-                        // indexing design.
-                        if (table != null &&
-                                
!PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME.equals(fullTableName) &&
-                                !ServerUtil.hasCoprocessor(c.getEnvironment(),
-                                GlobalIndexChecker.class.getName())) {
-                            List<PTable>
-                                    indexes =
-                                    PTableType.INDEX.equals(table.getType()) ?
-                                            Lists.newArrayList(table) :
-                                            table.getIndexes();
-                            // FIXME need to handle views and indexes on views 
as well
-                            for (PTable index : indexes) {
-                                if (index.getIndexDisableTimestamp() != 0) {
-                                    LOGGER.info("Modifying major compaction 
scanner to retain "
-                                            + "deleted cells for a table with 
disabled index: "
-                                            + fullTableName);
-                                    isDisabled = true;
-                                    break;
-                                }
-                            }
-                        }
-                        if (table != null && !isDisabled && 
isPhoenixTableTTLEnabled) {
-                            internalScanner =
-                                    new CompactionScanner(c.getEnvironment(), 
store, scanner,
-                                            MetaDataUtil.getMaxLookbackAge(
-                                                    
c.getEnvironment().getConfiguration(), table.getMaxLookbackAge()),
-                                            
SchemaUtil.getEmptyColumnFamily(table),
-                                            table.getEncodingScheme()
-                                                    == 
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
-                                                    
QueryConstants.EMPTY_COLUMN_BYTES :
-                                                    
table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME)
-                                            );
+
+        final TableName tableName = 
c.getEnvironment().getRegion().getRegionInfo().getTable();
+        // Compaction and split upcalls run with the effective user context of 
the requesting user.
+        // This will lead to failure of cross cluster RPC if the effective 
user is not
+        // the login user. Switch to the login user context to ensure we have 
the expected
+        // security context.
+        return User.runAsLoginUser(new 
PrivilegedExceptionAction<InternalScanner>() {
+            @Override
+            public InternalScanner run() throws Exception {
+                InternalScanner internalScanner = scanner;
+                boolean keepDeleted = false;
+                boolean isMultiTenantIndexTable = false;
+                if 
(tableName.getNameAsString().startsWith(MetaDataUtil.VIEW_INDEX_TABLE_PREFIX)) {
+                    isMultiTenantIndexTable = true;
+                }
+                final String fullTableName = isMultiTenantIndexTable ?
+                        
SchemaUtil.getParentTableNameFromIndexTable(tableName.getNameAsString(),
+                                MetaDataUtil.VIEW_INDEX_TABLE_PREFIX) :
+                        tableName.getNameAsString();
+                PTable table = null;
+                Long maxLookbackAge = null;
+                try (PhoenixConnection conn = QueryUtil.getConnectionOnServer(
+                        compactionConfig).unwrap(PhoenixConnection.class)) {
+                    table = conn.getTableNoCache(fullTableName);
+                    maxLookbackAge = table.getMaxLookbackAge();
+                    UngroupedAggregateRegionObserver.setMaxLookbackInMillis(
+                            tableName.getNameAsString(),
+                            MetaDataUtil.getMaxLookbackAge(
+                                    c.getEnvironment().getConfiguration(),
+                                    maxLookbackAge));
+                } catch (Exception e) {
+                    if (e instanceof TableNotFoundException) {
+                        LOGGER.debug("Ignoring HBase table that is not a 
Phoenix table: "
+                                + fullTableName);
+                        // non-Phoenix HBase tables won't be found, do nothing
+                    } else {
+                        LOGGER.error(
+                                "Unable to modify compaction scanner to retain 
deleted "
+                                        + "cells for a table with disabled 
Index; "
+                                        + fullTableName, e);
+                    }
+                }
+                // The previous indexing design needs to retain delete markers 
and deleted
+                // cells to rebuild disabled indexes. Thus, we skip major 
compaction for
+                // them. GlobalIndexChecker is the coprocessor introduced by 
the current
+                // indexing design.
+                if (table != null &&
+                        
!PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME.equals(fullTableName) &&
+                        !ServerUtil.hasCoprocessor(c.getEnvironment(),
+                        GlobalIndexChecker.class.getName())) {
+                    List<PTable>
+                            indexes =
+                            PTableType.INDEX.equals(table.getType()) ?
+                                    Lists.newArrayList(table) :
+                                    table.getIndexes();
+                    // FIXME need to handle views and indexes on views as well
+                    for (PTable index : indexes) {
+                        if (index.getIndexDisableTimestamp() != 0) {
+                            LOGGER.info("Modifying major compaction scanner to 
retain "
+                                    + "deleted cells for a table with disabled 
index: "
+                                    + fullTableName);
+                            keepDeleted = true;
+                            break;
                         }
                     }
+                }
+                if (table != null
+                        && 
isPhoenixTableTTLEnabled(c.getEnvironment().getConfiguration())) {
+                    internalScanner =
+                            new CompactionScanner(c.getEnvironment(), store, 
scanner,
+                                    MetaDataUtil.getMaxLookbackAge(
+                                            
c.getEnvironment().getConfiguration(),
+                                            maxLookbackAge),
+                                    SchemaUtil.getEmptyColumnFamily(table),
+                                    table.getEncodingScheme()
+                                            == 
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
+                                            QueryConstants.EMPTY_COLUMN_BYTES :
+                                            
table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME),
+                                    request.isMajor() || request.isAllFiles(), 
keepDeleted
+                                    );
+                }
+                if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
                     try {
                         long clientTimeStamp = 
EnvironmentEdgeManager.currentTimeMillis();
-                        DelegateRegionCoprocessorEnvironment compactionConfEnv 
=
+                        DelegateRegionCoprocessorEnvironment
+                                compactionConfEnv =
                                 new 
DelegateRegionCoprocessorEnvironment(c.getEnvironment(),
                                         ConnectionType.COMPACTION_CONNECTION);
-                        StatisticsCollector statisticsCollector =
+                        StatisticsCollector
+                                statisticsCollector =
                                 
StatisticsCollectorFactory.createStatisticsCollector(
-                                compactionConfEnv, 
tableName.getNameAsString(), clientTimeStamp,
-                                store.getColumnFamilyDescriptor().getName());
+                                        compactionConfEnv, 
tableName.getNameAsString(),
+                                        clientTimeStamp,
+                                        
store.getColumnFamilyDescriptor().getName());
                         statisticsCollector.init();
                         internalScanner =
                                 
statisticsCollector.createCompactionScanner(compactionConfEnv,
@@ -678,11 +732,10 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                             LOGGER.warn("Unable to collect stats for " + 
tableName, e);
                         }
                     }
-                    return internalScanner;
                 }
-            });
-        }
-        return scanner;
+                return internalScanner;
+            }
+        });
     }
 
     static PTable deserializeTable(byte[] b) {
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
index f9900fdb7b..e7157aee95 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MaxLookbackExtendedIT.java
@@ -22,12 +22,10 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.coprocessor.CompactionScanner;
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.jdbc.PhoenixResultSet;
-import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.schema.PTable;
@@ -50,9 +48,7 @@ import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 import java.sql.Connection;
-import java.sql.Date;
 import java.sql.DriverManager;
-import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -288,11 +284,12 @@ public class MaxLookbackExtendedIT extends BaseTest {
             long beforeFirstCompactSCN = 
EnvironmentEdgeManager.currentTimeMillis();
             injectEdge.incrementValue(1); //new ts for major compaction
             majorCompact(dataTable);
-            majorCompact(indexTable);
             assertRawRowCount(conn, dataTable, rowsPlusDeleteMarker);
+            majorCompact(indexTable);
             assertRawRowCount(conn, indexTable, rowsPlusDeleteMarker);
             //wait for the lookback time. After this compactions should purge 
the deleted row
-            long timeToAdvance = hasTableLevelMaxLookback ? 
TABLE_LEVEL_MAX_LOOKBACK_AGE : MAX_LOOKBACK_AGE;
+            long timeToAdvance = hasTableLevelMaxLookback
+                    ? TABLE_LEVEL_MAX_LOOKBACK_AGE : MAX_LOOKBACK_AGE;
             injectEdge.incrementValue(timeToAdvance * 1000);
             long beforeSecondCompactSCN = 
EnvironmentEdgeManager.currentTimeMillis();
             String notDeletedRowSql =
@@ -311,10 +308,14 @@ public class MaxLookbackExtendedIT extends BaseTest {
                 Assert.assertTrue(EnvironmentEdgeManager.currentTimeMillis() <
                         beforeDeleteSCN + MAX_LOOKBACK_AGE * 1000);
             }
+            flush(dataTable);
             majorCompact(dataTable);
-            majorCompact(indexTable);
             // Deleted row versions should be removed.
             assertRawRowCount(conn, dataTable, ROWS_POPULATED);
+
+            flush(indexTable);
+            majorCompact(indexTable);
+            // Deleted row versions should be removed.
             assertRawRowCount(conn, indexTable, ROWS_POPULATED);
 
             //deleted row should be gone, but not deleted row should still be 
there.
@@ -425,7 +426,7 @@ public class MaxLookbackExtendedIT extends BaseTest {
             assertRawRowCount(conn, dataTable, originalRowCount);
             assertRawRowCount(conn, indexTable, originalRowCount);
             assertExplainPlan(conn, indexSql, dataTableName, indexName);
-            long timeToAdvance = (MAX_LOOKBACK_AGE * 1000) -
+            long timeToAdvance = (MAX_LOOKBACK_AGE * 1000L) -
                 (EnvironmentEdgeManager.currentTimeMillis() - 
afterFirstInsertSCN);
             if (timeToAdvance > 0) {
                 injectEdge.incrementValue(timeToAdvance);
@@ -440,7 +441,7 @@ public class MaxLookbackExtendedIT extends BaseTest {
             assertRawRowCount(conn, dataTable, originalRowCount);
             assertRawRowCount(conn, indexTable, originalRowCount);
             //now wait the TTL
-            timeToAdvance = (ttl * 1000) -
+            timeToAdvance = (ttl * 1000L) -
                 (EnvironmentEdgeManager.currentTimeMillis() - 
afterFirstInsertSCN);
             if (timeToAdvance > 0) {
                 injectEdge.incrementValue(timeToAdvance);
@@ -458,7 +459,7 @@ public class MaxLookbackExtendedIT extends BaseTest {
             Assert.assertEquals(0, rs.getInt(1));
             // Increment the time by max lookback age and make sure that we 
can compact away
             // the now-expired rows
-            injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000);
+            injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000L);
             majorCompact(dataTable);
             majorCompact(indexTable);
             assertRawRowCount(conn, dataTable, 0);
@@ -579,34 +580,35 @@ public class MaxLookbackExtendedIT extends BaseTest {
                 CompactionScanner.overrideMaxLookback(tableNameTwo, "A", 
maxLookbackTwo * 1000);
                 CompactionScanner.overrideMaxLookback(tableNameTwo, "B", 
maxLookbackTwo * 1000);
             }
-            injectEdge.incrementValue(1);
+            injectEdge.incrementValue(1000);
             populateTable(tableNameOne);
             populateTable(tableNameTwo);
-            injectEdge.incrementValue(1);
+            injectEdge.incrementValue(1000);
             populateTable(tableNameOne);
             populateTable(tableNameTwo);
-            injectEdge.incrementValue(1);
+            injectEdge.incrementValue(1000);
             conn.createStatement().executeUpdate("DELETE FROM " + 
tableNameOne);
             conn.createStatement().executeUpdate("DELETE FROM " + 
tableNameTwo);
             conn.commit();
-            injectEdge.incrementValue(1);
             // Move the time so that deleted row versions will be outside the 
maxlookback window
-            // of tableNameOne but the delete markers will be inside
+            // of tableNameOne but the delete markers will be inside (in this 
case on the lower
+            // edge of the window)
             injectEdge.incrementValue(maxLookbackOne * 1000);
-            // Compact both tables. Deleted row versions will be removed from 
tableNameOne as they
-            // are now outside its max lookback window.
+            // Compact both tables. Deleted row versions will be retained 
since the delete marker
+            // is retained. This is necessary to include the preimage of the 
row in the CDC
+            // stream.
             flush(TableName.valueOf(tableNameOne));
             flush(TableName.valueOf(tableNameTwo));
             majorCompact(TableName.valueOf(tableNameOne));
             majorCompact(TableName.valueOf(tableNameTwo));
             if (multiCF) {
-                assertRawCellCount(conn, TableName.valueOf(tableNameOne), 
Bytes.toBytes("a"), 3);
-                assertRawCellCount(conn, TableName.valueOf(tableNameOne), 
Bytes.toBytes("b"), 3);
+                assertRawCellCount(conn, TableName.valueOf(tableNameOne), 
Bytes.toBytes("a"), 7);
+                assertRawCellCount(conn, TableName.valueOf(tableNameOne), 
Bytes.toBytes("b"), 7);
                 assertRawCellCount(conn, TableName.valueOf(tableNameTwo), 
Bytes.toBytes("a"), 11);
                 assertRawCellCount(conn, TableName.valueOf(tableNameTwo), 
Bytes.toBytes("b"), 11);
             } else {
-                assertRawCellCount(conn, TableName.valueOf(tableNameOne), 
Bytes.toBytes("a"), 1);
-                assertRawCellCount(conn, TableName.valueOf(tableNameOne), 
Bytes.toBytes("b"), 1);
+                assertRawCellCount(conn, TableName.valueOf(tableNameOne), 
Bytes.toBytes("a"), 5);
+                assertRawCellCount(conn, TableName.valueOf(tableNameOne), 
Bytes.toBytes("b"), 5);
                 assertRawCellCount(conn, TableName.valueOf(tableNameTwo), 
Bytes.toBytes("a"), 9);
                 assertRawCellCount(conn, TableName.valueOf(tableNameTwo), 
Bytes.toBytes("b"), 9);
             }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
index 3b6d1277e8..591016b37d 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TableTTLIT.java
@@ -53,6 +53,8 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 
+import static org.junit.Assert.assertTrue;
+
 @Category(NeedsOwnMiniClusterTest.class)
 @RunWith(Parameterized.class)
 public class TableTTLIT extends BaseTest {
@@ -158,34 +160,46 @@ public class TableTTLIT extends BaseTest {
         final int maxLookbackAge = tableLevelMaxLooback != null ? 
tableLevelMaxLooback : MAX_LOOKBACK_AGE;
         final int maxDeleteCounter = maxLookbackAge == 0 ? 1 : maxLookbackAge;
         final int maxCompactionCounter = ttl / 2;
+        final int maxFlushCounter = ttl;
         final int maxMaskingCounter = 2 * ttl;
+        final int maxVerificationCounter = 2 * ttl;
         final byte[] rowKey = Bytes.toBytes("a");
         try (Connection conn = DriverManager.getConnection(getUrl())) {
             String tableName = generateUniqueName();
             createTable(tableName);
+            conn.createStatement().execute("Alter Table " + tableName + " set 
\"phoenix.max.lookback.age.seconds\" = " + maxLookbackAge);
+            conn.commit();
             String noCompactTableName = generateUniqueName();
             createTable(noCompactTableName);
+            conn.createStatement().execute("Alter Table " + noCompactTableName 
+ " set \"phoenix.table.ttl.enabled\" = false");
+            conn.commit();
             long startTime = System.currentTimeMillis() + 1000;
             startTime = (startTime / 1000) * 1000;
             EnvironmentEdgeManager.injectEdge(injectEdge);
             injectEdge.setValue(startTime);
             int deleteCounter = RAND.nextInt(maxDeleteCounter) + 1;
             int compactionCounter = RAND.nextInt(maxCompactionCounter) + 1;
+            int flushCounter = RAND.nextInt(maxFlushCounter) + 1;
             int maskingCounter = RAND.nextInt(maxMaskingCounter) + 1;
-            boolean afterCompaction = false;
+            int verificationCounter = RAND.nextInt(maxVerificationCounter) + 1;
             for (int i = 0; i < 500; i++) {
+                if (flushCounter-- == 0) {
+                    injectEdge.incrementValue(1000);
+                    LOG.info("Flush " + i + " current time: " + 
injectEdge.currentTime());
+                    flush(TableName.valueOf(tableName));
+                    flushCounter = RAND.nextInt(maxFlushCounter) + 1;
+                }
                 if (compactionCounter-- == 0) {
                     injectEdge.incrementValue(1000);
-                    LOG.debug("Compaction " + i + " current time: " + 
injectEdge.currentTime());
+                    LOG.info("Compaction " + i + " current time: " + 
injectEdge.currentTime());
                     flush(TableName.valueOf(tableName));
                     majorCompact(TableName.valueOf(tableName));
                     compactionCounter = RAND.nextInt(maxCompactionCounter) + 1;
-                    afterCompaction = true;
                 }
                 if (maskingCounter-- == 0) {
                     updateRow(conn, tableName, noCompactTableName, "a");
                     injectEdge.incrementValue((ttl + maxLookbackAge + 1) * 
1000);
-                    LOG.debug("Masking " + i + " current time: " + 
injectEdge.currentTime());
+                    LOG.info("Masking " + i + " current time: " + 
injectEdge.currentTime());
                     ResultSet rs = conn.createStatement().executeQuery(
                             "SELECT count(*) FROM " + tableName);
                     Assert.assertTrue(rs.next());
@@ -200,20 +214,18 @@ public class TableTTLIT extends BaseTest {
                     maskingCounter = RAND.nextInt(maxMaskingCounter) + 1;
                 }
                 if (deleteCounter-- == 0) {
-                    LOG.debug("Delete " + i + " current time: " + 
injectEdge.currentTime());
+                    LOG.info("Delete " + i + " current time: " + 
injectEdge.currentTime());
                     deleteRow(conn, tableName, "a");
                     deleteRow(conn, noCompactTableName, "a");
                     deleteCounter = RAND.nextInt(maxDeleteCounter) + 1;
                     injectEdge.incrementValue(1000);
                 }
+                injectEdge.incrementValue(1000);
                 updateRow(conn, tableName, noCompactTableName, "a");
-                if (!afterCompaction) {
-                    injectEdge.incrementValue(1000);
-                    // We are interested in the correctness of compaction and 
masking. Thus, we
-                    // only need to do the latest version and scn queries to 
after compaction.
+                if (verificationCounter-- >  0) {
                     continue;
                 }
-                afterCompaction = false;
+                verificationCounter = RAND.nextInt(maxVerificationCounter) + 1;
                 compareRow(conn, tableName, noCompactTableName, "a", 
MAX_COLUMN_INDEX);
                 long scn = injectEdge.currentTime() - maxLookbackAge * 1000;
                 long scnEnd = injectEdge.currentTime();
@@ -227,7 +239,6 @@ public class TableTTLIT extends BaseTest {
                                 MAX_COLUMN_INDEX);
                     }
                 }
-                injectEdge.incrementValue(1000);
             }
         }
     }
@@ -257,16 +268,16 @@ public class TableTTLIT extends BaseTest {
                 // At every flush, extra cell versions should be removed.
                 // MAX_COLUMN_INDEX table columns and one empty column will be 
retained for
                 // each row version.
-                TestUtil.assertRawCellCount(conn, 
TableName.valueOf(tableName), row,
-                        (i + 1) * (MAX_COLUMN_INDEX + 1) * versions);
+                assertTrue(TestUtil.getRawCellCount(conn, 
TableName.valueOf(tableName), row)
+                        <= (i + 1) * (MAX_COLUMN_INDEX + 1) * versions);
             }
             // Run one minor compaction (in case no minor compaction has 
happened yet)
             Admin admin = utility.getAdmin();
             admin.compact(TableName.valueOf(tableName));
             int waitCount = 0;
             while (TestUtil.getRawCellCount(conn, TableName.valueOf(tableName),
-                    Bytes.toBytes("a")) < flushCount * (MAX_COLUMN_INDEX + 1) 
* versions) {
-                // Wait for major compactions to happen
+                    Bytes.toBytes("a")) >= flushCount * (MAX_COLUMN_INDEX + 1) 
* versions) {
+                // Wait for minor compactions to happen
                 Thread.sleep(1000);
                 waitCount++;
                 if (waitCount > 30) {
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index c15bd407c9..649c48844d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -977,7 +977,6 @@ public class TestUtil {
     public static CellCount getCellCount(Table table, boolean isRaw) throws 
IOException {
         Scan s = new Scan();
         s.setRaw(isRaw);
-        ;
         s.readAllVersions();
 
         CellCount cellCount = new CellCount();

Reply via email to