[ 
https://issues.apache.org/jira/browse/PHOENIX-6888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17696321#comment-17696321
 ] 

ASF GitHub Bot commented on PHOENIX-6888:
-----------------------------------------

gjacoby126 commented on code in PR #1569:
URL: https://github.com/apache/phoenix/pull/1569#discussion_r1124884514


##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/StoreCompactionScanner.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The store scanner that implements Phoenix TTL and Max Lookback
+ */
+public class StoreCompactionScanner implements InternalScanner {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(StoreCompactionScanner.class);
+    private final InternalScanner storeScanner;
+    private final Region region;
+    private final Store store;
+    private final Configuration config;
+    private final RegionCoprocessorEnvironment env;
+    private long maxLookbackWindowStart;
+    private long ttlWindowStart;
+    private int minVersion;
+    private int maxVersion;
+    private final boolean firstStore;
+    private KeepDeletedCells keepDeletedCells;
+    private long compactionTime;
+
+    public StoreCompactionScanner(RegionCoprocessorEnvironment env,
+                                Store store,
+                                InternalScanner storeScanner,
+                                long maxLookbackInMs) {
+        this.storeScanner = storeScanner;
+        this.region = env.getRegion();
+        this.store = store;
+        this.env = env;
+        this.config = env.getConfiguration();
+        compactionTime = EnvironmentEdgeManager.currentTimeMillis();
+        this.maxLookbackWindowStart = compactionTime - maxLookbackInMs;
+        ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
+        long ttl = cfd.getTimeToLive();

Review Comment:
   We're taking the TTL / min version / max versions from the 
ColumnFamilyDescriptor, which is going to be correct almost all the time. But 
what if some other coproc (not part of open-source Phoenix) is trying to modify 
these values similarly to what max lookback does using ScanOptions?



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/StoreCompactionScanner.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The store scanner that implements Phoenix TTL and Max Lookback
+ */
+public class StoreCompactionScanner implements InternalScanner {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(StoreCompactionScanner.class);
+    private final InternalScanner storeScanner;
+    private final Region region;
+    private final Store store;
+    private final Configuration config;
+    private final RegionCoprocessorEnvironment env;
+    private long maxLookbackWindowStart;
+    private long ttlWindowStart;
+    private int minVersion;
+    private int maxVersion;
+    private final boolean firstStore;
+    private KeepDeletedCells keepDeletedCells;
+    private long compactionTime;
+
+    public StoreCompactionScanner(RegionCoprocessorEnvironment env,
+                                Store store,
+                                InternalScanner storeScanner,
+                                long maxLookbackInMs) {
+        this.storeScanner = storeScanner;
+        this.region = env.getRegion();
+        this.store = store;
+        this.env = env;
+        this.config = env.getConfiguration();
+        compactionTime = EnvironmentEdgeManager.currentTimeMillis();
+        this.maxLookbackWindowStart = compactionTime - maxLookbackInMs;
+        ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
+        long ttl = cfd.getTimeToLive();
+        this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime - 
ttl * 1000;
+        this.minVersion = cfd.getMinVersions();
+        this.maxVersion = cfd.getMaxVersions();
+        this.keepDeletedCells = cfd.getKeepDeletedCells();
+        firstStore = region.getStores().get(0).getColumnFamilyName().
+                equals(store.getColumnFamilyName());
+    }
+
+    @Override
+    public boolean next(List<Cell> result) throws IOException {
+        synchronized (storeScanner) {
+            boolean hasMore = storeScanner.next(result);
+            filter(result, true);
+            Collections.sort(result, CellComparator.getInstance());
+            return hasMore;
+        }
+    }
+
+    @Override
+    public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
+        return next(result);
+    }
+
+    @Override
+    public void close() throws IOException {
+        storeScanner.close();
+    }
+    private void formColumns(List<Cell> result, List<List<Cell>> columns,
+            List<Cell> deleteMarkers) {
+        Cell currentColumnCell = null;
+        List<Cell> currentColumn = null;
+        for (Cell cell : result) {
+            if (cell.getType() != Cell.Type.Put) {
+                deleteMarkers.add(cell);
+            }
+            if (currentColumnCell == null) {
+                currentColumn = new ArrayList<>();
+                currentColumnCell = cell;
+                currentColumn.add(cell);
+            }
+            else if (Bytes.compareTo(cell.getQualifierArray(), 
cell.getQualifierOffset(),
+                    cell.getQualifierLength(),
+                    currentColumnCell.getQualifierArray(), 
currentColumnCell.getQualifierOffset(),
+                    currentColumnCell.getQualifierLength()) != 0) {
+                columns.add(currentColumn);
+                currentColumn = new ArrayList<>();
+                currentColumnCell = cell;
+                currentColumn.add(cell);
+            }
+            else {
+                currentColumn.add(cell);
+            }
+        }
+        if (currentColumn != null) {
+            columns.add(currentColumn);
+        }
+    }
+
+    /**
+     * A row version that does not share a cell with any other row version is 
called a
+     * compaction row version.
+     * The latest live or deleted row version at the compaction time 
(compactionTime) is the first
+     * compaction row version. The next row version which does not share a 
cell with the
+     * first compaction row version is the next compaction row version.
+     *
+     * The first compaction row version is a valid row version (i.e., a row 
version at a given
+     * time). The subsequent compactions row versions may not represent a 
valid row version if
+     * the rows are updated partially.
+     *
+     * Compaction row versions are used for compaction purposes to determine 
which row versions to
+     * retain.
+     */
+    class CompactionRowVersion {
+        // Cells included in the row version
+        List<Cell> cells = new ArrayList<>();
+        // The delete marker deleting this row version
+        Cell deleteFamilyMarker = null;
+        // Delete or DeleteColumn markers deleting a cell of this version
+        List<Cell> columnDeleteMarkers = null;
+        // The timestamp of the row version
+        long ts = 0;
+        // The version of a row version. It is the minimum of the versions of 
the cells included
+        // in the row version
+        int version = 0;
+        private void addColumnDeleteMarker(Cell deleteMarker) {
+            if (columnDeleteMarkers == null) {
+                columnDeleteMarkers = new ArrayList<>();
+            }
+            columnDeleteMarkers.add(deleteMarker);
+        }
+    }
+
+    private boolean isCellDeleted(List<Cell> deleteMarkers, 
CompactionRowVersion rowVersion,
+            Cell cell, boolean storeLevel) {
+        int i = 0;
+        for (Cell dm : deleteMarkers) {
+            if ((storeLevel ||
+                    Bytes.compareTo(cell.getFamilyArray(), 
cell.getFamilyOffset(),
+                    cell.getFamilyLength(),
+                    dm.getFamilyArray(), dm.getFamilyOffset(), 
dm.getFamilyLength()) == 0) &&
+                    Bytes.compareTo(cell.getQualifierArray(), 
cell.getQualifierOffset(),
+                            cell.getQualifierLength(),
+                            dm.getQualifierArray(), dm.getQualifierOffset(),
+                            dm.getQualifierLength()) == 0) {
+                if (dm.getType() == Cell.Type.Delete) {

Review Comment:
   This is because a Type.Delete will wipe out the whole row, and hence any 
DeleteColumn / DeleteColumnVersions / DeleteFamily/DeleteFamilyVersions that 
came before? Good to mention this in a comment. 



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/StoreCompactionScanner.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.*;

Review Comment:
   nit: star import



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/StoreCompactionScanner.java:
##########
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The store scanner that implements Phoenix TTL and Max Lookback
+ */
+public class StoreCompactionScanner implements InternalScanner {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(StoreCompactionScanner.class);
+    private final InternalScanner storeScanner;
+    private final Region region;
+    private final Store store;
+    private final Configuration config;
+    private final RegionCoprocessorEnvironment env;
+    private long maxLookbackWindowStart;
+    private long ttlWindowStart;
+    private int minVersion;
+    private int maxVersion;
+    private final boolean firstStore;
+    private KeepDeletedCells keepDeletedCells;
+    private long compactionTime;
+
+    public StoreCompactionScanner(RegionCoprocessorEnvironment env,
+                                Store store,
+                                InternalScanner storeScanner,
+                                long maxLookbackInMs) {
+        this.storeScanner = storeScanner;
+        this.region = env.getRegion();
+        this.store = store;
+        this.env = env;
+        this.config = env.getConfiguration();
+        compactionTime = EnvironmentEdgeManager.currentTimeMillis();
+        this.maxLookbackWindowStart = compactionTime - maxLookbackInMs;
+        ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
+        long ttl = cfd.getTimeToLive();
+        this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime - 
ttl * 1000;
+        this.minVersion = cfd.getMinVersions();
+        this.maxVersion = cfd.getMaxVersions();
+        this.keepDeletedCells = cfd.getKeepDeletedCells();
+        firstStore = region.getStores().get(0).getColumnFamilyName().
+                equals(store.getColumnFamilyName());
+    }
+
+    @Override
+    public boolean next(List<Cell> result) throws IOException {
+        synchronized (storeScanner) {
+            boolean hasMore = storeScanner.next(result);
+            filter(result, true);
+            Collections.sort(result, CellComparator.getInstance());
+            return hasMore;
+        }
+    }
+
+    @Override
+    public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
+        return next(result);
+    }
+
+    @Override
+    public void close() throws IOException {
+        storeScanner.close();
+    }
+    private void formColumns(List<Cell> result, List<List<Cell>> columns,
+            List<Cell> deleteMarkers) {
+        Cell currentColumnCell = null;
+        List<Cell> currentColumn = null;
+        for (Cell cell : result) {
+            if (cell.getType() != Cell.Type.Put) {
+                deleteMarkers.add(cell);
+            }
+            if (currentColumnCell == null) {
+                currentColumn = new ArrayList<>();
+                currentColumnCell = cell;
+                currentColumn.add(cell);
+            }
+            else if (Bytes.compareTo(cell.getQualifierArray(), 
cell.getQualifierOffset(),
+                    cell.getQualifierLength(),
+                    currentColumnCell.getQualifierArray(), 
currentColumnCell.getQualifierOffset(),
+                    currentColumnCell.getQualifierLength()) != 0) {
+                columns.add(currentColumn);
+                currentColumn = new ArrayList<>();
+                currentColumnCell = cell;
+                currentColumn.add(cell);
+            }
+            else {
+                currentColumn.add(cell);
+            }
+        }
+        if (currentColumn != null) {
+            columns.add(currentColumn);
+        }
+    }
+
+    /**
+     * A row version that does not share a cell with any other row version is 
called a
+     * compaction row version.
+     * The latest live or deleted row version at the compaction time 
(compactionTime) is the first
+     * compaction row version. The next row version which does not share a 
cell with the
+     * first compaction row version is the next compaction row version.
+     *
+     * The first compaction row version is a valid row version (i.e., a row 
version at a given
+     * time). The subsequent compactions row versions may not represent a 
valid row version if
+     * the rows are updated partially.
+     *
+     * Compaction row versions are used for compaction purposes to determine 
which row versions to
+     * retain.
+     */
+    class CompactionRowVersion {
+        // Cells included in the row version
+        List<Cell> cells = new ArrayList<>();
+        // The delete marker deleting this row version
+        Cell deleteFamilyMarker = null;
+        // Delete or DeleteColumn markers deleting a cell of this version
+        List<Cell> columnDeleteMarkers = null;
+        // The timestamp of the row version
+        long ts = 0;
+        // The version of a row version. It is the minimum of the versions of 
the cells included
+        // in the row version
+        int version = 0;
+        private void addColumnDeleteMarker(Cell deleteMarker) {
+            if (columnDeleteMarkers == null) {
+                columnDeleteMarkers = new ArrayList<>();
+            }
+            columnDeleteMarkers.add(deleteMarker);
+        }
+    }
+
+    private boolean isCellDeleted(List<Cell> deleteMarkers, 
CompactionRowVersion rowVersion,
+            Cell cell, boolean storeLevel) {
+        int i = 0;
+        for (Cell dm : deleteMarkers) {
+            if ((storeLevel ||
+                    Bytes.compareTo(cell.getFamilyArray(), 
cell.getFamilyOffset(),
+                    cell.getFamilyLength(),
+                    dm.getFamilyArray(), dm.getFamilyOffset(), 
dm.getFamilyLength()) == 0) &&
+                    Bytes.compareTo(cell.getQualifierArray(), 
cell.getQualifierOffset(),
+                            cell.getQualifierLength(),
+                            dm.getQualifierArray(), dm.getQualifierOffset(),
+                            dm.getQualifierLength()) == 0) {
+                if (dm.getType() == Cell.Type.Delete) {
+                    deleteMarkers.remove(i);
+                    if (rowVersion.columnDeleteMarkers == null) {
+                        rowVersion.columnDeleteMarkers = new ArrayList<>();
+                    }
+                    rowVersion.columnDeleteMarkers.add(dm);
+                }
+                return true;
+            }
+            i++;
+        }
+        return false;
+    }
+
+    private long getNextRowVersionTimestamp(List<List<Cell>> columns) {
+        long ts = 0;
+        for (List<Cell> column : columns) {
+            Cell firstCell = column.get(0);
+            if (firstCell.getType() == Cell.Type.DeleteFamily ||
+                    firstCell.getType() == Cell.Type.DeleteFamilyVersion) {
+                break;
+            }
+            if (firstCell.getType() == Cell.Type.DeleteColumn ||
+                    firstCell.getType() == Cell.Type.Delete) {
+                continue;
+            }
+            if (ts < firstCell.getTimestamp()) {
+                ts = firstCell.getTimestamp();
+            }
+        }
+        return ts;
+    }
+
+    private boolean formRowVersions(List<List<Cell>> columns,
+                                    List<CompactionRowVersion> rowVersions,
+                                    boolean storeLevel) {
+        Cell lastDeleteFamilyMarker = null;
+        Cell lastDeleteFamilyVersionMarker = null;
+        List<Cell> columnDeleteMarkers = null;
+        int version = 0;
+        while (!columns.isEmpty()) {
+            long ts = getNextRowVersionTimestamp(columns);
+            CompactionRowVersion rowVersion = null;
+            // Form the next row version by picking the first cell from each 
column if the cell
+            // is not masked by a delete marker
+            for (List<Cell> column : columns) {
+                Cell firstCell = column.remove(0);
+                if (firstCell.getType() == Cell.Type.DeleteFamily ||
+                        firstCell.getType() == Cell.Type.DeleteFamilyVersion) {
+                    if (firstCell.getType() == Cell.Type.DeleteFamily) {
+                        if (firstCell.getTimestamp() >= ttlWindowStart &&
+                                keepDeletedCells == KeepDeletedCells.FALSE) {
+                            // This family delete marker deletes the rest of 
the row versions.
+                            // There is nothing left to do here
+                            return true;
+                        }
+                        lastDeleteFamilyMarker = firstCell;
+                    } else {
+                        lastDeleteFamilyVersionMarker = firstCell;
+                    }
+                    break;
+                }
+                if (firstCell.getType() == Cell.Type.DeleteColumn ||
+                        firstCell.getType() == Cell.Type.Delete) {
+                    if (columnDeleteMarkers == null) {
+                        columnDeleteMarkers = new ArrayList<>();
+                    }
+                    columnDeleteMarkers.add(firstCell);
+                    if (rowVersion != null) {
+                        rowVersion.addColumnDeleteMarker(firstCell);
+                    }
+                }
+                else if (rowVersion == null) {
+                    if (storeLevel && !firstStore && ts < ttlWindowStart &&
+                            version < maxVersion - 1 && lastDeleteFamilyMarker 
== null) {
+                        // For the stores that are not the first (or only) 
store of their region,
+                        // we cannot determine if a live row version is 
outside the TTL window
+                        // should be retained if the version of the row 
version is less than the max
+                        // version. We need to do a row scan to at the region 
level in this case.
+                        return false;
+                    }
+                    rowVersion = new CompactionRowVersion();
+                    rowVersions.add(rowVersion);
+                    rowVersion.ts = ts;
+                    if (lastDeleteFamilyVersionMarker != null) {
+                        rowVersion.deleteFamilyMarker = 
lastDeleteFamilyVersionMarker;
+                        // The delete family version delete marker is 
consumed, so set it to null
+                        lastDeleteFamilyVersionMarker = null;
+                    } else if (lastDeleteFamilyMarker != null) {
+                        rowVersion.deleteFamilyMarker = lastDeleteFamilyMarker;
+                    }
+                    if (columnDeleteMarkers != null) {
+                        rowVersion.columnDeleteMarkers = new 
ArrayList<>(columnDeleteMarkers);
+                    }
+                    rowVersion.cells.add(firstCell);
+                    rowVersion.version = version++;
+                } else if (firstCell.getTimestamp() == ts) {
+                    rowVersion.cells.add(firstCell);
+                } else if (columnDeleteMarkers == null ||
+                        !isCellDeleted(columnDeleteMarkers, rowVersion, 
firstCell, storeLevel)) {
+                    rowVersion.cells.add(firstCell);
+                }
+            }
+            // Remove the columns that are empty
+            int columnIndex = 0;
+            while (!columns.isEmpty() && columnIndex < columns.size()) {
+                if (columns.get(columnIndex).isEmpty()) {
+                    columns.remove(columnIndex);
+                } else {
+                    columnIndex++;
+                }
+            }
+        }
+        return true;
+    }
+
+    private void addCells(CompactionRowVersion rowVersion, List<Cell> result) {
+        if (rowVersion.columnDeleteMarkers == null) {
+            result.addAll(rowVersion.cells);
+            return;
+        }
+        for (Cell cell : rowVersion.cells) {
+            if (rowVersion.columnDeleteMarkers.isEmpty()) {
+                result.add(cell);
+            }
+            else {
+                Cell dm = rowVersion.columnDeleteMarkers.get(0);
+                if (Bytes.compareTo(cell.getQualifierArray(), 
cell.getQualifierOffset(),
+                        cell.getQualifierLength(), dm.getQualifierArray(), 
dm.getQualifierOffset(),
+                        dm.getQualifierLength()) == 0) {
+                    rowVersion.columnDeleteMarkers.remove(0);
+                    continue;
+                }
+                else {
+                    result.add(cell);
+                }
+            }
+        }
+    }
+    private void 
updateResultsWithRowOutsideMaxLookbackButInsideTTLWindow(List<Cell> result,
+            CompactionRowVersion rowVersion) {
+        // The rows that are outside the max lookback window but inside the 
TTL window
+        if (rowVersion.deleteFamilyMarker == null) {
+            if (rowVersion.version < maxVersion) {
+                // Live rows whose version is less than max version are 
retained inside
+                // the TTL window
+                addCells(rowVersion, result);
+            }
+        }
+        else if (keepDeletedCells == KeepDeletedCells.TTL) {
+            // Deleted rows inside the TTL window with KeepDeletedCells.TTL 
are retained
+            addCells(rowVersion, result);
+        }
+        else if (keepDeletedCells == KeepDeletedCells.TRUE) {
+            if (rowVersion.version < maxVersion) {
+                // Deleted rows inside the TTL window with 
KeepDeletedCells.TRUE are
+                // retained if their version is less than max version
+                addCells(rowVersion, result);
+            }
+        }
+    }
+
+    private void updateResultsWithRowOutsideTTLWindow(List<Cell> result,
+            CompactionRowVersion rowVersion) {
+        if (rowVersion.deleteFamilyMarker == null) {
+            if (firstStore) {
+                if (rowVersion.version < minVersion) {
+                    // Live rows whose version is less than min version are 
retained outside
+                    // the TTL window
+                    addCells(rowVersion, result);
+                }
+            } else if (rowVersion.version < maxVersion) {
+                // check if this row needs to be retained

Review Comment:
   no logic here 

> Fixing TTL and Max Lookback Issues for Phoenix Tables
> -----------------------------------------------------
>
>                 Key: PHOENIX-6888
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-6888
>             Project: Phoenix
>          Issue Type: Bug
>    Affects Versions: 5.1.3
>            Reporter: Kadir Ozdemir
>            Assignee: Kadir Ozdemir
>            Priority: Major
>
> In HBase, the unit of data is a cell and data retention rules are executed at 
> the cell level. These rules are defined at the column family level. Phoenix 
> leverages the data retention features of HBase and exposes them to its users 
> to provide its TTL feature at the table level. However, these rules (since 
> they are defined at the cell level instead of the row level) results in 
> partial row retention that in turn creates data integrity issues at the 
> Phoenix level. 
> Similarly, Phoenix’s max lookback feature leverages HBase deleted data 
> retention capabilities to preserve deleted cells within a configurable max 
> lookback. This requires two data retention windows, max lookback and TTL. One 
> end of these windows is the current time and the end is a moment in the past 
> (i.e., current time minus the window size). Typically, the max lookback 
> window is shorter than the TTL window. In the max lookback window, we would 
> like to preserve the complete history of mutations regardless of how many 
> cell versions these mutations generated. In the remaining TTL window outside 
> the max lookback, we would like to apply the data retention rules defined 
> above. However, HBase provides only one data retention window. Thus, the max 
> lookback window had to be extended to become TTL window and the max lookback 
> feature results in unwantedly retaining deleted data for the maximum of max 
> lookback and TTL periods. 
> This Jira is to fix both of these issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to