[
https://issues.apache.org/jira/browse/OMID-102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16570303#comment-16570303
]
ASF GitHub Bot commented on OMID-102:
-------------------------------------
Github user JamesRTaylor commented on a diff in the pull request:
https://github.com/apache/incubator-omid/pull/41#discussion_r207921192
--- Diff:
hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java
---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import com.google.common.base.Optional;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.*;
+
+public class TransactionVisibilityFilter extends FilterBase {
+
+ // optional sub-filter to apply to visible cells
+ private final Filter userFilter;
+ private final SnapshotFilterImpl snapshotFilter;
+ private final Map<Long ,Long> commitCache;
+ private final HBaseTransaction hbaseTransaction;
+
+ // This cache is cleared when moving to the next row
+ // So no need to keep row name
+ private final Map<ImmutableBytesWritable, Long> familyDeletionCache;
+
+ public SnapshotFilter getSnapshotFilter() {
+ return snapshotFilter;
+ }
+
+ public TransactionVisibilityFilter(Filter cellFilter,
+ SnapshotFilterImpl snapshotFilter,
+ HBaseTransaction hbaseTransaction) {
+ this.userFilter = cellFilter;
+ this.snapshotFilter = snapshotFilter;
+ commitCache = new HashMap<>();
+ this.hbaseTransaction = hbaseTransaction;
+ familyDeletionCache = new HashMap<>();
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(Cell v) throws IOException {
+ if (CellUtils.isShadowCell(v)) {
+ Long commitTs = Bytes.toLong(CellUtil.cloneValue(v));
+ commitCache.put(v.getTimestamp(), commitTs);
+ // Continue getting shadow cells until one of them fits this
transaction
+ if (hbaseTransaction.getStartTimestamp() >= commitTs) {
+ return ReturnCode.NEXT_COL;
+ } else {
+ return ReturnCode.SKIP;
+ }
+ }
+
+ Optional<Long> ct = getCommitIfInSnapshot(v,
CellUtils.isFamilyDeleteCell(v));
+ if (ct.isPresent()) {
+ commitCache.put(v.getTimestamp(), ct.get());
+ if (hbaseTransaction.getVisibilityLevel() ==
AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL &&
+ snapshotFilter.getTSIfInTransaction(v,
hbaseTransaction).isPresent()) {
+ return runUserFilter(v, ReturnCode.INCLUDE);
+ }
+ if (CellUtils.isFamilyDeleteCell(v)) {
+ familyDeletionCache.put(createImmutableBytesWritable(v),
ct.get());
+ if (hbaseTransaction.getVisibilityLevel() ==
AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
+ return runUserFilter(v,
ReturnCode.INCLUDE_AND_NEXT_COL);
+ } else {
+ return ReturnCode.NEXT_COL;
+ }
+ }
+ Long deleteCommit =
familyDeletionCache.get(createImmutableBytesWritable(v));
+ if (deleteCommit != null && deleteCommit >= v.getTimestamp()) {
+ if (hbaseTransaction.getVisibilityLevel() ==
AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
+ return runUserFilter(v,
ReturnCode.INCLUDE_AND_NEXT_COL);
+ } else {
+ return ReturnCode.NEXT_COL;
+ }
+ }
+ if (CellUtils.isTombstone(v)) {
+ if (hbaseTransaction.getVisibilityLevel() ==
AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
+ return runUserFilter(v,
ReturnCode.INCLUDE_AND_NEXT_COL);
+ } else {
+ return ReturnCode.NEXT_COL;
+ }
+ }
+
+ return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
+ }
+
+ return ReturnCode.SKIP;
+ }
+
+
+ private ImmutableBytesWritable createImmutableBytesWritable(Cell v) {
+ return new ImmutableBytesWritable(v.getFamilyArray(),
+ v.getFamilyOffset(),v.getFamilyLength());
+ }
+
+ private ReturnCode runUserFilter(Cell v, ReturnCode snapshotReturn)
+ throws IOException {
+ assert(snapshotReturn == ReturnCode.INCLUDE_AND_NEXT_COL ||
snapshotReturn == ReturnCode.INCLUDE);
+ if (userFilter == null) {
+ return snapshotReturn;
+ }
+
+ ReturnCode userRes = userFilter.filterKeyValue(v);
+ switch (userRes) {
+ case INCLUDE:
+ return snapshotReturn;
+ case SKIP:
+ return (snapshotReturn == ReturnCode.INCLUDE) ?
ReturnCode.SKIP: ReturnCode.NEXT_COL;
+ default:
+ return userRes;
+ }
+
+ }
+
+ // For family delete cells, the sc hasn't arrived yet so get sc from
region before going to ct
+ private Optional<Long> getCommitIfInSnapshot(Cell v, boolean
getShadowCellBeforeCT) throws IOException {
+ Long cachedCommitTS = commitCache.get(v.getTimestamp());
+ if (cachedCommitTS != null && hbaseTransaction.getStartTimestamp()
>= cachedCommitTS) {
+ return Optional.of(cachedCommitTS);
+ }
+ if (snapshotFilter.getTSIfInTransaction(v,
hbaseTransaction).isPresent()) {
+ return Optional.of(v.getTimestamp());
+ }
+
+ if (getShadowCellBeforeCT) {
+
+ // Try to get shadow cell from region
+ final Get get = new Get(CellUtil.cloneRow(v));
+ get.setTimeStamp(v.getTimestamp()).setMaxVersions(1);
+ get.addColumn(CellUtil.cloneFamily(v),
CellUtils.addShadowCellSuffixPrefix(CellUtils.FAMILY_DELETE_QUALIFIER));
+ Result shadowCell =
snapshotFilter.getTableAccessWrapper().get(get);
+
+ if (!shadowCell.isEmpty() &&
+
Bytes.toLong(CellUtil.cloneValue(shadowCell.rawCells()[0] )) <=
hbaseTransaction.getStartTimestamp()){
--- End diff --
Don't repeat Bytes.toLong(CellUtil.cloneValue(shadowCell.rawCells()[0] ))
here. You want to minimize heap allocation during filtering.
> Implement visibility filter as pure HBase Filter
> ------------------------------------------------
>
> Key: OMID-102
> URL: https://issues.apache.org/jira/browse/OMID-102
> Project: Apache Omid
> Issue Type: Sub-task
> Reporter: James Taylor
> Assignee: Yonatan Gottesman
> Priority: Major
>
> The way Omid currently filters through it's own RegionScanner won't work the
> way it's implemented (i.e. the way the filtering is done *after* the next
> call). The reason is that the state of HBase filters get messed up since
> these filters will start to see cells that it shouldn't (i.e. cells that
> would be filtered based on snapshot isolation). It cannot be worked around by
> manually running filters afterwards because filters may issue seek calls
> which are handled during the running of scans by HBase.
>
> Instead, the filtering needs to be implemented as a pure HBase filter and
> that filter needs to delegate to the other, delegate filter once it's
> determined that the cell is visible. See Tephra's TransactionVisibilityFilter
> and they way it calls the delegate filter (cellFilters) only after it's
> determined that the cell is visible. You may run into TEPHRA-169 without
> including the CellSkipFilter too.
> Because it'll be easier if you see shadow cells *before* their corresponding
> real cells you can prefix instead of suffix the column qualifiers to
> guarantee that you'd see the shadow cells prior to the actual cells. Or you
> could buffer cells in your filter prior to omitting them. Another issue would
> be if the shadow cells aren't found and you need to consult the commit table
> - I suppose if the shadow cells are first, this logic would be easier to know
> when it needs to be called.
>
> To reproduce, see the Phoenix unit tests
> FlappingTransactionIT.testInflightUpdateNotSeen() and
> testInflightDeleteNotSeen().
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)