[ 
https://issues.apache.org/jira/browse/PHOENIX-6888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707461#comment-17707461
 ] 

ASF GitHub Bot commented on PHOENIX-6888:
-----------------------------------------

kadirozde commented on code in PR #1569:
URL: https://github.com/apache/phoenix/pull/1569#discussion_r1155014806


##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java:
##########
@@ -165,7 +165,7 @@ public void testBytesRowsForSelectWithLimitIgnored() throws 
Exception {
             ResultSet rs = conn.createStatement().executeQuery(sql);
             assertFalse(rs.next());
             Estimate info = getByteRowEstimates(conn, sql, binds);
-            assertEquals((Long) 390L, info.estimatedBytes);
+            assertEquals((Long) 691L, info.estimatedBytes);

Review Comment:
   This is to undo my change on 3/17/2019. I had the impression that the 
Phoenix server code should never return the empty column cell back to the 
client. It turned out that it was not the case especially for this query. As 
you can see, this PR does not remove the empty column from the returned result 
anymore. The increase/revert is because of that.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java:
##########
@@ -0,0 +1,908 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeepDeletedCells;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.phoenix.query.QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX;
+
+/**
+ * The store scanner that implements Phoenix TTL and Max Lookback. Phoenix 
overrides the
+ * HBase implementation of data retention policies which is built at the cell 
level, and implements
+ * its row level data retention within this store scanner.
+ */
+public class CompactionScanner implements InternalScanner {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CompactionScanner.class);
+    public static final String SEPARATOR = ":";
+    private final InternalScanner storeScanner;
+    private final Region region;
+    private final Store store;
+    private final Configuration config;
+    private final RegionCoprocessorEnvironment env;
+    private long maxLookbackWindowStart;
+    private long ttlWindowStart;
+    private long ttl;
+    private final long maxLookbackInMillis;
+    private int minVersion;
+    private int maxVersion;
+    private final boolean emptyCFStore;
+    private KeepDeletedCells keepDeletedCells;
+    private long compactionTime;
+    private final byte[] emptyCF;
+    private final byte[] emptyCQ;
+    private static Map<String, Long> maxLookbackMap = new 
ConcurrentHashMap<>();
+    private PhoenixLevelRowCompactor phoenixLevelRowCompactor;
+    private HBaseLevelRowCompactor hBaseLevelRowCompactor;
+
+    public CompactionScanner(RegionCoprocessorEnvironment env,
+            Store store,
+            InternalScanner storeScanner,
+            long maxLookbackInMillis,
+            byte[] emptyCF,
+            byte[] emptyCQ) {
+        this.storeScanner = storeScanner;
+        this.region = env.getRegion();
+        this.store = store;
+        this.env = env;
+        this.emptyCF = emptyCF;
+        this.emptyCQ = emptyCQ;
+        this.config = env.getConfiguration();
+        compactionTime = EnvironmentEdgeManager.currentTimeMillis();
+        this.maxLookbackInMillis = maxLookbackInMillis;
+        String columnFamilyName = store.getColumnFamilyName();
+        String tableName = region.getRegionInfo().getTable().getNameAsString();
+        Long overriddenMaxLookback =
+                maxLookbackMap.remove(tableName + SEPARATOR + 
columnFamilyName);
+        this.maxLookbackWindowStart = compactionTime - (overriddenMaxLookback 
== null ?
+                maxLookbackInMillis : Math.max(maxLookbackInMillis, 
overriddenMaxLookback)) - 1;
+        ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor();
+        ttl = cfd.getTimeToLive();
+        this.ttlWindowStart = ttl == HConstants.FOREVER ? 1 : compactionTime - 
ttl * 1000;
+        ttl *= 1000;
+        this.maxLookbackWindowStart = Math.max(ttlWindowStart, 
maxLookbackWindowStart);
+        this.minVersion = cfd.getMinVersions();
+        this.maxVersion = cfd.getMaxVersions();
+        this.keepDeletedCells = cfd.getKeepDeletedCells();
+        emptyCFStore = region.getTableDescriptor().getColumnFamilies().length 
== 1 ||
+                columnFamilyName.equals(Bytes.toString(emptyCF)) ||
+                columnFamilyName.startsWith(LOCAL_INDEX_COLUMN_FAMILY_PREFIX);
+        phoenixLevelRowCompactor = new PhoenixLevelRowCompactor();
+        hBaseLevelRowCompactor = new HBaseLevelRowCompactor();
+    }
+
+    /**
+     * Any coprocessors within a JVM can extend the max lookback window for a 
column family
+     * by calling this static method.
+     */
+    public static void overrideMaxLookback(String tableName, String 
columnFamilyName,
+            long maxLookbackInMillis) {
+        if (tableName == null || columnFamilyName == null) {
+            return;
+        }
+        Long old = maxLookbackMap.putIfAbsent(tableName + SEPARATOR + 
columnFamilyName,
+                maxLookbackInMillis);
+        if (old != null && old < maxLookbackInMillis) {
+            maxLookbackMap.put(tableName + SEPARATOR + columnFamilyName, 
maxLookbackInMillis);
+        }
+    }
+
+    @Override
+    public boolean next(List<Cell> result) throws IOException {
+        boolean hasMore = storeScanner.next(result);
+        if (!result.isEmpty()) {
+            phoenixLevelRowCompactor.compact(result, false);
+        }
+        return hasMore;
+    }
+
+    @Override
+    public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
+        return next(result);
+    }
+
+    @Override
+    public void close() throws IOException {
+        storeScanner.close();
+    }
+
+    /**
+     * The context for a given row during compaction. A row may have multiple 
compaction row
+     * versions. CompactionScanner uses the same row context for these 
versions.
+     */
+    class RowContext {
+        Cell familyDeleteMarker = null;
+        Cell familyVersionDeleteMarker = null;
+        List<Cell> columnDeleteMarkers = null;
+        int version = 0;
+        long maxTimestamp;
+        long minTimestamp;
+        private void addColumnDeleteMarker(Cell deleteMarker) {
+            if (columnDeleteMarkers == null) {
+                columnDeleteMarkers = new ArrayList<>();
+            }
+            columnDeleteMarkers.add(deleteMarker);
+        }
+    }
+
+    /**
+     * This method finds out the maximum and minimum timestamp of the cells of 
the next row
+     * version.
+     *
+     * @param columns
+     * @param rowContext
+     */
+    private void getNextRowVersionTimestamp(LinkedList<LinkedList<Cell>> 
columns,
+            RowContext rowContext) {
+        rowContext.maxTimestamp = 0;
+        rowContext.minTimestamp = Long.MAX_VALUE;
+        long ts;
+        long currentDeleteFamilyTimestamp = 0;
+        long nextDeleteFamilyTimestamp = 0;
+        boolean firstColumn = true;
+        for (LinkedList<Cell> column : columns) {
+            Cell firstCell = column.getFirst();
+            ts = firstCell.getTimestamp();
+            if (ts <= nextDeleteFamilyTimestamp) {
+                continue;
+            }
+            if (firstCell.getType() == Cell.Type.DeleteFamily ||
+                    firstCell.getType() == Cell.Type.DeleteFamilyVersion) {
+                if (firstColumn) {
+                    // Family delete markers are always found in the first 
column of a column family
+                    // When Phoenix deletes a row, it places a family delete 
marker in each column
+                    // family with the same timestamp. We just need to process 
the delete column
+                    // family markers of the first column family, which would 
be in the column
+                    // with columnIndex=0.
+                    currentDeleteFamilyTimestamp = firstCell.getTimestamp();
+                    // We need to check if the next delete family marker 
exits. If so, we need
+                    // to record its timestamp as by definition a compaction 
row version cannot
+                    // cross a family delete marker
+                    if (column.size() > 1) {
+                        nextDeleteFamilyTimestamp = 
column.get(1).getTimestamp();
+                    } else {
+                        nextDeleteFamilyTimestamp = 0;
+                    }
+                }
+            } if (firstCell.getType() == Cell.Type.Put) {

Review Comment:
   In oder to prevent the unnecessary evaluation of the second if condition 
when the first if condition is true, I wanted to have "else if" but did not 
type the else keyword apparently. I will add it.



##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME;
+
+/**
+ *  TTLRegionScanner masks expired rows using the empty column cell timestamp
+ */
+public class TTLRegionScanner extends BaseRegionScanner {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(TTLRegionScanner.class);
+    private final boolean isPhoenixTableTTLEnabled;
+    private final RegionCoprocessorEnvironment env;
+    private Scan scan;
+    private long rowCount = 0;
+    private long pageSize = Long.MAX_VALUE;
+    long ttl;
+    long ttlWindowStart;
+    byte[] emptyCQ;
+    byte[] emptyCF;
+    private boolean initialized = false;
+
+    public TTLRegionScanner(final RegionCoprocessorEnvironment env, final Scan 
scan,
+            final RegionScanner s) {
+        super(s);
+        this.env = env;
+        this.scan = scan;
+        emptyCQ = scan.getAttribute(EMPTY_COLUMN_QUALIFIER_NAME);
+        emptyCF = scan.getAttribute(EMPTY_COLUMN_FAMILY_NAME);
+        long currentTime = EnvironmentEdgeManager.currentTimeMillis();
+        ttl = 
env.getRegion().getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
+        ttlWindowStart = ttl == HConstants.FOREVER ? 1 : currentTime - ttl * 
1000;
+        ttl *= 1000;
+        isPhoenixTableTTLEnabled = emptyCF != null && emptyCQ != null &&

Review Comment:
   Regardless if the TTL is disabled cluster wide or the client is an older 
client and did not supply the empty column parameters, the masking should not 
be done here. I will rename it to isMaskingEnabled to eliminate any confusion.



##########
phoenix-core/src/it/java/org/apache/phoenix/end2end/ExplainPlanWithStatsEnabledIT.java:
##########
@@ -204,7 +204,7 @@ public void testBytesRowsForSelectOnIndex() throws 
Exception {
                 assertTrue(rs.next());
             }
             Estimate info = getByteRowEstimates(conn, sql, binds);
-            assertEquals((Long) 390L, info.estimatedBytes);
+            assertEquals((Long) 691L, info.estimatedBytes);

Review Comment:
   See the previous comment.





> Fixing TTL and Max Lookback Issues for Phoenix Tables
> -----------------------------------------------------
>
>                 Key: PHOENIX-6888
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-6888
>             Project: Phoenix
>          Issue Type: Bug
>    Affects Versions: 5.1.3
>            Reporter: Kadir Ozdemir
>            Assignee: Kadir Ozdemir
>            Priority: Major
>
> In HBase, the unit of data is a cell and data retention rules are executed at 
> the cell level. These rules are defined at the column family level. Phoenix 
> leverages the data retention features of HBase and exposes them to its users 
> to provide its TTL feature at the table level. However, these rules (since 
> they are defined at the cell level instead of the row level) results in 
> partial row retention that in turn creates data integrity issues at the 
> Phoenix level. 
> Similarly, Phoenix’s max lookback feature leverages HBase deleted data 
> retention capabilities to preserve deleted cells within a configurable max 
> lookback. This requires two data retention windows, max lookback and TTL. One 
> end of these windows is the current time and the end is a moment in the past 
> (i.e., current time minus the window size). Typically, the max lookback 
> window is shorter than the TTL window. In the max lookback window, we would 
> like to preserve the complete history of mutations regardless of how many 
> cell versions these mutations generated. In the remaining TTL window outside 
> the max lookback, we would like to apply the data retention rules defined 
> above. However, HBase provides only one data retention window. Thus, the max 
> lookback window had to be extended to become TTL window and the max lookback 
> feature results in unwantedly retaining deleted data for the maximum of max 
> lookback and TTL periods. 
> This Jira is to fix both of these issues.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to