virajjasani commented on code in PR #5545:
URL: https://github.com/apache/hbase/pull/5545#discussion_r1442383606


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java:
##########
@@ -17,42 +17,48 @@
  */
 package org.apache.hadoop.hbase.regionserver.compactions;
 
+import static 
org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_COMPACTION_ENABLE_DUAL_FILE_WRITER_KEY;
+import static 
org.apache.hadoop.hbase.regionserver.DefaultStoreEngine.DEFAULT_ENABLE_DUAL_FILE_WRITER;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.function.Consumer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.DualFileWriter;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
-import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-
 /**
  * Compact passed set of files. Create an instance and then call
  * {@link #compact(CompactionRequestImpl, ThroughputController, User)}
  */
 @InterfaceAudience.Private
-public class DefaultCompactor extends Compactor<StoreFileWriter> {
+public class DefaultCompactor extends 
AbstractMultiOutputCompactor<DualFileWriter> {
   private static final Logger LOG = 
LoggerFactory.getLogger(DefaultCompactor.class);

Review Comment:
   nit: no longer in use



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java:
##########
@@ -478,6 +477,7 @@ protected boolean performCompaction(FileDetails fd, 
InternalScanner scanner, Cel
             return false;
           }
         }
+        writer.appendAll(cells);

Review Comment:
   With this change, earlier if `hbase.hstore.close.check.interval` limit was 
reached, we would have appended the cell in the loop but now we would not have 
appended any cell. Though this change seems correct because even if the limit 
is reached, we already reset compaction counters with `progress.cancel()`.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DualFileWriter.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static 
org.apache.hadoop.hbase.regionserver.HStoreFile.HAS_LIVE_VERSIONS_KEY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Separates the provided cells into two files, one file for the live cells 
and the other for the
+ * rest of the cells (historical cells). The live cells includes the live put 
cells, delete all and
+ * version delete markers that are not masked by other delete all markers.
+ */
[email protected]
+public class DualFileWriter extends AbstractMultiFileWriter {
+
+  private final CellComparator comparator;
+  private StoreFileWriter liveVersionWriter;
+  private StoreFileWriter historicalVersionWriter;
+
+  private final List<StoreFileWriter> writers;
+  // The last cell of the current row
+  private Cell lastCell;
+  // The first (latest) delete family marker of the current row
+  private Cell deleteFamily;
+  // The list of delete family version markers of the current row
+  private List<Cell> deleteFamilyVersionList = new ArrayList<>();
+  // The first (latest) delete column marker of the current column
+  private Cell deleteColumn;
+  // The list of delete column version markers of the current column
+  private List<Cell> deleteColumnVersionList = new ArrayList<>();
+  // The live put cell count for the current column
+  private int livePutCellCount;
+  private final boolean dualWriterEnabled;
+  private final int maxVersions;
+  private final boolean newVersionBehavior;
+
+  public DualFileWriter(CellComparator comparator, int maxVersions, boolean 
dualWriterEnabled,
+    boolean newVersionBehavior) {
+    this.comparator = comparator;
+    this.maxVersions = maxVersions;
+    this.dualWriterEnabled = dualWriterEnabled;
+    this.newVersionBehavior = newVersionBehavior;
+    writers = new ArrayList<>(2);
+    initRowState();
+  }
+
+  private void initRowState() {
+    deleteFamily = null;
+    deleteFamilyVersionList.clear();
+    lastCell = null;
+  }
+
+  private void initColumnState() {
+    livePutCellCount = 0;
+    deleteColumn = null;
+    deleteColumnVersionList.clear();
+
+  }
+
+  private void addLiveVersion(Cell cell) throws IOException {
+    if (liveVersionWriter == null) {
+      liveVersionWriter = writerFactory.createWriter();
+      writers.add(liveVersionWriter);
+    }
+    liveVersionWriter.append(cell);
+  }
+
+  private void addHistoricalVersion(Cell cell) throws IOException {
+    if (historicalVersionWriter == null) {
+      historicalVersionWriter = writerFactory.createWriter();
+      writers.add(historicalVersionWriter);
+    }
+    historicalVersionWriter.append(cell);
+  }
+
+  private boolean isDeletedByDeleteFamily(Cell cell) {
+    return deleteFamily != null && (deleteFamily.getTimestamp() > 
cell.getTimestamp()
+      || (deleteFamily.getTimestamp() == cell.getTimestamp()
+        && (!newVersionBehavior || cell.getSequenceId() < 
deleteFamily.getSequenceId())));
+  }
+
+  private boolean isDeletedByDeleteFamilyVersion(Cell cell) {
+    for (Cell deleteFamilyVersion : deleteFamilyVersionList) {
+      if (
+        deleteFamilyVersion.getTimestamp() == cell.getTimestamp()
+          && (!newVersionBehavior || cell.getSequenceId() < 
deleteFamilyVersion.getSequenceId())
+      ) return true;
+    }
+    return false;
+  }
+
+  private boolean isDeletedByDeleteColumn(Cell cell) {
+    return deleteColumn != null && (deleteColumn.getTimestamp() > 
cell.getTimestamp()
+      || (deleteColumn.getTimestamp() == cell.getTimestamp()
+        && (!newVersionBehavior || cell.getSequenceId() < 
deleteColumn.getSequenceId())));
+  }
+
+  private boolean isDeletedByDeleteColumnVersion(Cell cell) {
+    for (Cell deleteColumnVersion : deleteColumnVersionList) {
+      if (
+        deleteColumnVersion.getTimestamp() == cell.getTimestamp()
+          && (!newVersionBehavior || cell.getSequenceId() < 
deleteColumnVersion.getSequenceId())
+      ) return true;
+    }
+    return false;
+  }
+
+  private boolean isDeleted(Cell cell) {
+    return isDeletedByDeleteFamily(cell) || isDeletedByDeleteColumn(cell)
+      || isDeletedByDeleteFamilyVersion(cell) || 
isDeletedByDeleteColumnVersion(cell);
+  }
+
+  private void appendCell(Cell cell) throws IOException {
+    if ((lastCell == null || !CellUtil.matchingColumn(lastCell, cell))) {
+      initColumnState();
+    }
+    if (cell.getType() == Cell.Type.DeleteFamily) {
+      if (deleteFamily == null) {
+        if (cell.getType() == Cell.Type.DeleteFamily) {
+          deleteFamily = cell;
+          addLiveVersion(cell);
+        } else {
+          addHistoricalVersion(cell);
+        }
+      }
+    } else if (cell.getType() == Cell.Type.DeleteFamilyVersion) {
+      if (deleteFamily == null) {
+        deleteFamilyVersionList.add(cell);
+        addLiveVersion(cell);
+      } else {
+        addHistoricalVersion(cell);
+      }
+    } else if (cell.getType() == Cell.Type.DeleteColumn) {
+      if (!isDeletedByDeleteFamily(cell) && deleteColumn == null) {
+        deleteColumn = cell;
+        addLiveVersion(cell);
+      } else {
+        addHistoricalVersion(cell);
+      }
+    } else if (cell.getType() == Cell.Type.Delete) {
+      if (!isDeletedByDeleteFamily(cell) && deleteColumn == null) {
+        deleteColumnVersionList.add(cell);
+        addLiveVersion(cell);
+      } else {
+        addHistoricalVersion(cell);
+      }
+    } else if (cell.getType() == Cell.Type.Put) {
+      if (livePutCellCount < maxVersions) {
+        // This is a live put cell (i.e., the latest version) of a column. Is 
it deleted?
+        if (!isDeleted(cell)) {
+          addLiveVersion(cell);
+          livePutCellCount++;
+        } else {
+          // It is deleted
+          addHistoricalVersion(cell);
+        }
+      } else {
+        // It is an older put cell
+        addHistoricalVersion(cell);
+      }
+    }
+    lastCell = cell;
+  }
+
+  @Override
+  public void appendAll(List<Cell> cellList) throws IOException {
+    if (!dualWriterEnabled) {
+      // If the dual writer is not enabled then all cells are written to one 
file. We use
+      // the live version file in this case
+      for (Cell cell : cellList) {
+        addLiveVersion(cell);
+      }
+      return;
+    }
+    if (cellList.isEmpty()) {
+      return;
+    }
+    if (lastCell != null && comparator.compareRows(lastCell, cellList.get(0)) 
!= 0) {
+      // It is a new row and thus time to reset the state
+      initRowState();
+    }
+    for (Cell cell : cellList) {
+      appendCell(cell);
+    }
+  }
+
+  @Override
+  public void append(Cell cell) throws IOException {
+    if (!dualWriterEnabled) {
+      // If the dual writer is not enabled then all cells are written to one 
file. We use
+      // the live version file in this case
+      addLiveVersion(cell);
+      return;
+    }
+    if (lastCell != null && comparator.compareRows(lastCell, cell) != 0) {
+      // It is a new row and thus time to reset the state
+      initRowState();
+    }
+    appendCell(cell);
+  }
+
+  @Override
+  protected Collection<StoreFileWriter> writers() {
+    return writers;
+  }
+
+  @Override
+  protected void preCommitWriters() throws IOException {
+    if (writers.isEmpty()) {
+      liveVersionWriter = writerFactory.createWriter();
+      writers.add(liveVersionWriter);
+    }
+    if (!dualWriterEnabled) {
+      return;
+    }
+    if (liveVersionWriter != null) {
+      liveVersionWriter.appendFileInfo(HAS_LIVE_VERSIONS_KEY, 
Bytes.toBytes(true));
+    }
+    if (historicalVersionWriter != null) {
+      historicalVersionWriter.appendFileInfo(HAS_LIVE_VERSIONS_KEY, 
Bytes.toBytes(false));
+    }
+  }
+
+  public HFile.Writer getHFileWriter() {

Review Comment:
   nit: instead of `getHFileWriter()`, maybe we can call it 
`getLiveVersionHFileWriter()`?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java:
##########
@@ -138,6 +140,12 @@ public class HStoreFile implements StoreFile {
   // Indicates if the file got compacted
   private volatile boolean compactedAway = false;
 
+  // Indicate if the file contains live cell versions for a given column
+  // in a row. MemStore flushes generate files with all cell versions. However,
+  // compactions can generate two files, one with the live cell versions and 
the other
+  // with the remaining (historical) cell versions.
+  private volatile boolean hasLiveVersions = true;

Review Comment:
   I was thinking why this is not kept `false` by default, but now i realized 
that it is only used by `DefaultStoreFileManager#getLiveVersionFiles` which 
will only be used by callers if 
`hbase.hstore.defaultengine.enable.dualfilewriter` is enabled. 
   
   Hence, unless dual file writer is enabled, this is not even used, so it's 
default value here doesn't matter. Is that correct? If so, maybe we can comment 
the same here so that no one tries to use this value without checking for 
`hbase.hstore.defaultengine.enable.dualfilewriter`.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DualFileWriter.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static 
org.apache.hadoop.hbase.regionserver.HStoreFile.HAS_LIVE_VERSIONS_KEY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Separates the provided cells into two files, one file for the live cells 
and the other for the
+ * rest of the cells (historical cells). The live cells includes the live put 
cells, delete all and
+ * version delete markers that are not masked by other delete all markers.
+ */
[email protected]
+public class DualFileWriter extends AbstractMultiFileWriter {
+
+  private final CellComparator comparator;
+  private StoreFileWriter liveVersionWriter;
+  private StoreFileWriter historicalVersionWriter;
+
+  private final List<StoreFileWriter> writers;
+  // The last cell of the current row
+  private Cell lastCell;
+  // The first (latest) delete family marker of the current row
+  private Cell deleteFamily;
+  // The list of delete family version markers of the current row
+  private List<Cell> deleteFamilyVersionList = new ArrayList<>();
+  // The first (latest) delete column marker of the current column
+  private Cell deleteColumn;
+  // The list of delete column version markers of the current column
+  private List<Cell> deleteColumnVersionList = new ArrayList<>();
+  // The live put cell count for the current column
+  private int livePutCellCount;
+  private final boolean dualWriterEnabled;
+  private final int maxVersions;
+  private final boolean newVersionBehavior;
+
+  public DualFileWriter(CellComparator comparator, int maxVersions, boolean 
dualWriterEnabled,
+    boolean newVersionBehavior) {
+    this.comparator = comparator;
+    this.maxVersions = maxVersions;
+    this.dualWriterEnabled = dualWriterEnabled;
+    this.newVersionBehavior = newVersionBehavior;
+    writers = new ArrayList<>(2);
+    initRowState();
+  }
+
+  private void initRowState() {
+    deleteFamily = null;
+    deleteFamilyVersionList.clear();
+    lastCell = null;
+  }
+
+  private void initColumnState() {
+    livePutCellCount = 0;
+    deleteColumn = null;
+    deleteColumnVersionList.clear();
+
+  }
+
+  private void addLiveVersion(Cell cell) throws IOException {
+    if (liveVersionWriter == null) {
+      liveVersionWriter = writerFactory.createWriter();
+      writers.add(liveVersionWriter);
+    }
+    liveVersionWriter.append(cell);
+  }
+
+  private void addHistoricalVersion(Cell cell) throws IOException {
+    if (historicalVersionWriter == null) {
+      historicalVersionWriter = writerFactory.createWriter();
+      writers.add(historicalVersionWriter);
+    }
+    historicalVersionWriter.append(cell);
+  }
+
+  private boolean isDeletedByDeleteFamily(Cell cell) {
+    return deleteFamily != null && (deleteFamily.getTimestamp() > 
cell.getTimestamp()
+      || (deleteFamily.getTimestamp() == cell.getTimestamp()
+        && (!newVersionBehavior || cell.getSequenceId() < 
deleteFamily.getSequenceId())));
+  }
+
+  private boolean isDeletedByDeleteFamilyVersion(Cell cell) {
+    for (Cell deleteFamilyVersion : deleteFamilyVersionList) {
+      if (
+        deleteFamilyVersion.getTimestamp() == cell.getTimestamp()
+          && (!newVersionBehavior || cell.getSequenceId() < 
deleteFamilyVersion.getSequenceId())
+      ) return true;
+    }
+    return false;
+  }
+
+  private boolean isDeletedByDeleteColumn(Cell cell) {
+    return deleteColumn != null && (deleteColumn.getTimestamp() > 
cell.getTimestamp()
+      || (deleteColumn.getTimestamp() == cell.getTimestamp()
+        && (!newVersionBehavior || cell.getSequenceId() < 
deleteColumn.getSequenceId())));
+  }
+
+  private boolean isDeletedByDeleteColumnVersion(Cell cell) {
+    for (Cell deleteColumnVersion : deleteColumnVersionList) {
+      if (
+        deleteColumnVersion.getTimestamp() == cell.getTimestamp()
+          && (!newVersionBehavior || cell.getSequenceId() < 
deleteColumnVersion.getSequenceId())
+      ) return true;
+    }
+    return false;
+  }
+
+  private boolean isDeleted(Cell cell) {
+    return isDeletedByDeleteFamily(cell) || isDeletedByDeleteColumn(cell)
+      || isDeletedByDeleteFamilyVersion(cell) || 
isDeletedByDeleteColumnVersion(cell);
+  }
+
+  private void appendCell(Cell cell) throws IOException {
+    if ((lastCell == null || !CellUtil.matchingColumn(lastCell, cell))) {
+      initColumnState();
+    }
+    if (cell.getType() == Cell.Type.DeleteFamily) {
+      if (deleteFamily == null) {
+        if (cell.getType() == Cell.Type.DeleteFamily) {
+          deleteFamily = cell;
+          addLiveVersion(cell);
+        } else {
+          addHistoricalVersion(cell);
+        }
+      }
+    } else if (cell.getType() == Cell.Type.DeleteFamilyVersion) {
+      if (deleteFamily == null) {
+        deleteFamilyVersionList.add(cell);
+        addLiveVersion(cell);
+      } else {
+        addHistoricalVersion(cell);
+      }
+    } else if (cell.getType() == Cell.Type.DeleteColumn) {
+      if (!isDeletedByDeleteFamily(cell) && deleteColumn == null) {
+        deleteColumn = cell;
+        addLiveVersion(cell);
+      } else {
+        addHistoricalVersion(cell);
+      }
+    } else if (cell.getType() == Cell.Type.Delete) {
+      if (!isDeletedByDeleteFamily(cell) && deleteColumn == null) {
+        deleteColumnVersionList.add(cell);
+        addLiveVersion(cell);
+      } else {
+        addHistoricalVersion(cell);
+      }
+    } else if (cell.getType() == Cell.Type.Put) {
+      if (livePutCellCount < maxVersions) {
+        // This is a live put cell (i.e., the latest version) of a column. Is 
it deleted?
+        if (!isDeleted(cell)) {
+          addLiveVersion(cell);
+          livePutCellCount++;
+        } else {
+          // It is deleted
+          addHistoricalVersion(cell);
+        }
+      } else {
+        // It is an older put cell
+        addHistoricalVersion(cell);
+      }
+    }
+    lastCell = cell;
+  }
+
+  @Override
+  public void appendAll(List<Cell> cellList) throws IOException {
+    if (!dualWriterEnabled) {
+      // If the dual writer is not enabled then all cells are written to one 
file. We use
+      // the live version file in this case
+      for (Cell cell : cellList) {
+        addLiveVersion(cell);
+      }
+      return;
+    }
+    if (cellList.isEmpty()) {
+      return;
+    }
+    if (lastCell != null && comparator.compareRows(lastCell, cellList.get(0)) 
!= 0) {
+      // It is a new row and thus time to reset the state
+      initRowState();
+    }
+    for (Cell cell : cellList) {
+      appendCell(cell);
+    }
+  }
+
+  @Override
+  public void append(Cell cell) throws IOException {
+    if (!dualWriterEnabled) {
+      // If the dual writer is not enabled then all cells are written to one 
file. We use
+      // the live version file in this case
+      addLiveVersion(cell);
+      return;
+    }
+    if (lastCell != null && comparator.compareRows(lastCell, cell) != 0) {
+      // It is a new row and thus time to reset the state
+      initRowState();
+    }
+    appendCell(cell);
+  }
+
+  @Override
+  protected Collection<StoreFileWriter> writers() {
+    return writers;
+  }
+
+  @Override
+  protected void preCommitWriters() throws IOException {
+    if (writers.isEmpty()) {
+      liveVersionWriter = writerFactory.createWriter();
+      writers.add(liveVersionWriter);
+    }

Review Comment:
   Is it possible for writers to be empty in precommit phase? If so, we still 
don't need to initialize `liveVersionWriter` here? or i am missing something?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DualFileWriter.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static 
org.apache.hadoop.hbase.regionserver.HStoreFile.HAS_LIVE_VERSIONS_KEY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Separates the provided cells into two files, one file for the live cells 
and the other for the
+ * rest of the cells (historical cells). The live cells includes the live put 
cells, delete all and
+ * version delete markers that are not masked by other delete all markers.
+ */
[email protected]
+public class DualFileWriter extends AbstractMultiFileWriter {
+
+  private final CellComparator comparator;
+  private StoreFileWriter liveVersionWriter;
+  private StoreFileWriter historicalVersionWriter;
+
+  private final List<StoreFileWriter> writers;
+  // The last cell of the current row
+  private Cell lastCell;
+  // The first (latest) delete family marker of the current row
+  private Cell deleteFamily;
+  // The list of delete family version markers of the current row
+  private List<Cell> deleteFamilyVersionList = new ArrayList<>();
+  // The first (latest) delete column marker of the current column
+  private Cell deleteColumn;
+  // The list of delete column version markers of the current column
+  private List<Cell> deleteColumnVersionList = new ArrayList<>();
+  // The live put cell count for the current column
+  private int livePutCellCount;
+  private final boolean dualWriterEnabled;
+  private final int maxVersions;
+  private final boolean newVersionBehavior;
+
+  public DualFileWriter(CellComparator comparator, int maxVersions, boolean 
dualWriterEnabled,
+    boolean newVersionBehavior) {
+    this.comparator = comparator;
+    this.maxVersions = maxVersions;
+    this.dualWriterEnabled = dualWriterEnabled;
+    this.newVersionBehavior = newVersionBehavior;
+    writers = new ArrayList<>(2);
+    initRowState();
+  }
+
+  private void initRowState() {
+    deleteFamily = null;
+    deleteFamilyVersionList.clear();
+    lastCell = null;
+  }
+
+  private void initColumnState() {
+    livePutCellCount = 0;
+    deleteColumn = null;
+    deleteColumnVersionList.clear();
+
+  }
+
+  private void addLiveVersion(Cell cell) throws IOException {
+    if (liveVersionWriter == null) {
+      liveVersionWriter = writerFactory.createWriter();
+      writers.add(liveVersionWriter);
+    }
+    liveVersionWriter.append(cell);
+  }
+
+  private void addHistoricalVersion(Cell cell) throws IOException {
+    if (historicalVersionWriter == null) {
+      historicalVersionWriter = writerFactory.createWriter();
+      writers.add(historicalVersionWriter);
+    }
+    historicalVersionWriter.append(cell);
+  }
+
+  private boolean isDeletedByDeleteFamily(Cell cell) {
+    return deleteFamily != null && (deleteFamily.getTimestamp() > 
cell.getTimestamp()
+      || (deleteFamily.getTimestamp() == cell.getTimestamp()
+        && (!newVersionBehavior || cell.getSequenceId() < 
deleteFamily.getSequenceId())));
+  }
+
+  private boolean isDeletedByDeleteFamilyVersion(Cell cell) {
+    for (Cell deleteFamilyVersion : deleteFamilyVersionList) {
+      if (
+        deleteFamilyVersion.getTimestamp() == cell.getTimestamp()
+          && (!newVersionBehavior || cell.getSequenceId() < 
deleteFamilyVersion.getSequenceId())
+      ) return true;
+    }
+    return false;
+  }
+
+  private boolean isDeletedByDeleteColumn(Cell cell) {
+    return deleteColumn != null && (deleteColumn.getTimestamp() > 
cell.getTimestamp()
+      || (deleteColumn.getTimestamp() == cell.getTimestamp()
+        && (!newVersionBehavior || cell.getSequenceId() < 
deleteColumn.getSequenceId())));
+  }
+
+  private boolean isDeletedByDeleteColumnVersion(Cell cell) {
+    for (Cell deleteColumnVersion : deleteColumnVersionList) {
+      if (
+        deleteColumnVersion.getTimestamp() == cell.getTimestamp()
+          && (!newVersionBehavior || cell.getSequenceId() < 
deleteColumnVersion.getSequenceId())
+      ) return true;
+    }
+    return false;
+  }
+
+  private boolean isDeleted(Cell cell) {
+    return isDeletedByDeleteFamily(cell) || isDeletedByDeleteColumn(cell)
+      || isDeletedByDeleteFamilyVersion(cell) || 
isDeletedByDeleteColumnVersion(cell);
+  }
+
+  private void appendCell(Cell cell) throws IOException {
+    if ((lastCell == null || !CellUtil.matchingColumn(lastCell, cell))) {
+      initColumnState();
+    }
+    if (cell.getType() == Cell.Type.DeleteFamily) {
+      if (deleteFamily == null) {
+        if (cell.getType() == Cell.Type.DeleteFamily) {
+          deleteFamily = cell;
+          addLiveVersion(cell);
+        } else {
+          addHistoricalVersion(cell);
+        }
+      }
+    } else if (cell.getType() == Cell.Type.DeleteFamilyVersion) {
+      if (deleteFamily == null) {

Review Comment:
   Do we not need check here to see if the DeleteFamilyVersion is deleted by 
DeleteFamily?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java:
##########
@@ -46,13 +45,16 @@ public class DefaultStoreEngine extends 
StoreEngine<DefaultStoreFlusher, RatioBa
     "hbase.hstore.defaultengine.compactor.class";
   public static final String DEFAULT_COMPACTION_POLICY_CLASS_KEY =
     "hbase.hstore.defaultengine.compactionpolicy.class";
+  public static final String DEFAULT_COMPACTION_ENABLE_DUAL_FILE_WRITER_KEY =
+    "hbase.hstore.defaultengine.enable.dualfilewriter";
 
   private static final Class<? extends DefaultStoreFlusher> 
DEFAULT_STORE_FLUSHER_CLASS =
     DefaultStoreFlusher.class;
   private static final Class<? extends DefaultCompactor> 
DEFAULT_COMPACTOR_CLASS =
     DefaultCompactor.class;
-  private static final Class<? extends RatioBasedCompactionPolicy> 
DEFAULT_COMPACTION_POLICY_CLASS =
+  public static final Class<? extends RatioBasedCompactionPolicy> 
DEFAULT_COMPACTION_POLICY_CLASS =
     ExploringCompactionPolicy.class;
+  public static final boolean DEFAULT_ENABLE_DUAL_FILE_WRITER = false;

Review Comment:
   I think temporarily we can change this to `true` and see how build results 
go? after all tests pass, we can change it back to `false` before committing 
the changes.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java:
##########
@@ -124,7 +124,7 @@ public static List<StoreFileScanner> 
getScannersForStoreFiles(Collection<HStoreF
    */
   public static List<StoreFileScanner> 
getScannersForStoreFiles(Collection<HStoreFile> files,
     boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean 
canUseDrop,
-    ScanQueryMatcher matcher, long readPt) throws IOException {
+    ScanQueryMatcher matcher, long readPt, boolean onlyLatestVersion) throws 
IOException {

Review Comment:
   not required?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java:
##########
@@ -223,6 +224,10 @@ private void addCurrentScanners(List<? extends 
KeyValueScanner> scanners) {
     this.currentScanners.addAll(scanners);
   }
 
+  private static boolean isOnlyLatestVersionScan(Scan scan) {
+    return !scan.isRaw() && scan.getTimeRange().getMax() == 
HConstants.LATEST_TIMESTAMP;
+  }

Review Comment:
   nit: perhaps good to comment that we don't need to check for 
Scan#getMaxVersions because live version file generated by dual file writer 
will anyways retain max versions specified in ColumnFamilyDescriptor for the 
given CF?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DualFileWriter.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static 
org.apache.hadoop.hbase.regionserver.HStoreFile.HAS_LIVE_VERSIONS_KEY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Separates the provided cells into two files, one file for the live cells 
and the other for the
+ * rest of the cells (historical cells). The live cells includes the live put 
cells, delete all and
+ * version delete markers that are not masked by other delete all markers.
+ */
[email protected]
+public class DualFileWriter extends AbstractMultiFileWriter {
+
+  private final CellComparator comparator;
+  private StoreFileWriter liveVersionWriter;
+  private StoreFileWriter historicalVersionWriter;
+
+  private final List<StoreFileWriter> writers;
+  // The last cell of the current row
+  private Cell lastCell;
+  // The first (latest) delete family marker of the current row
+  private Cell deleteFamily;
+  // The list of delete family version markers of the current row
+  private List<Cell> deleteFamilyVersionList = new ArrayList<>();
+  // The first (latest) delete column marker of the current column
+  private Cell deleteColumn;
+  // The list of delete column version markers of the current column
+  private List<Cell> deleteColumnVersionList = new ArrayList<>();
+  // The live put cell count for the current column
+  private int livePutCellCount;
+  private final boolean dualWriterEnabled;
+  private final int maxVersions;
+  private final boolean newVersionBehavior;
+
+  public DualFileWriter(CellComparator comparator, int maxVersions, boolean 
dualWriterEnabled,
+    boolean newVersionBehavior) {
+    this.comparator = comparator;
+    this.maxVersions = maxVersions;
+    this.dualWriterEnabled = dualWriterEnabled;
+    this.newVersionBehavior = newVersionBehavior;
+    writers = new ArrayList<>(2);
+    initRowState();
+  }
+
+  private void initRowState() {
+    deleteFamily = null;
+    deleteFamilyVersionList.clear();
+    lastCell = null;
+  }
+
+  private void initColumnState() {
+    livePutCellCount = 0;
+    deleteColumn = null;
+    deleteColumnVersionList.clear();
+
+  }
+
+  private void addLiveVersion(Cell cell) throws IOException {
+    if (liveVersionWriter == null) {
+      liveVersionWriter = writerFactory.createWriter();
+      writers.add(liveVersionWriter);
+    }
+    liveVersionWriter.append(cell);
+  }
+
+  private void addHistoricalVersion(Cell cell) throws IOException {
+    if (historicalVersionWriter == null) {
+      historicalVersionWriter = writerFactory.createWriter();
+      writers.add(historicalVersionWriter);
+    }
+    historicalVersionWriter.append(cell);
+  }
+
+  private boolean isDeletedByDeleteFamily(Cell cell) {
+    return deleteFamily != null && (deleteFamily.getTimestamp() > 
cell.getTimestamp()
+      || (deleteFamily.getTimestamp() == cell.getTimestamp()
+        && (!newVersionBehavior || cell.getSequenceId() < 
deleteFamily.getSequenceId())));
+  }
+
+  private boolean isDeletedByDeleteFamilyVersion(Cell cell) {
+    for (Cell deleteFamilyVersion : deleteFamilyVersionList) {
+      if (
+        deleteFamilyVersion.getTimestamp() == cell.getTimestamp()
+          && (!newVersionBehavior || cell.getSequenceId() < 
deleteFamilyVersion.getSequenceId())
+      ) return true;
+    }
+    return false;
+  }
+
+  private boolean isDeletedByDeleteColumn(Cell cell) {
+    return deleteColumn != null && (deleteColumn.getTimestamp() > 
cell.getTimestamp()
+      || (deleteColumn.getTimestamp() == cell.getTimestamp()
+        && (!newVersionBehavior || cell.getSequenceId() < 
deleteColumn.getSequenceId())));
+  }
+
+  private boolean isDeletedByDeleteColumnVersion(Cell cell) {
+    for (Cell deleteColumnVersion : deleteColumnVersionList) {
+      if (
+        deleteColumnVersion.getTimestamp() == cell.getTimestamp()
+          && (!newVersionBehavior || cell.getSequenceId() < 
deleteColumnVersion.getSequenceId())
+      ) return true;
+    }
+    return false;
+  }
+
+  private boolean isDeleted(Cell cell) {
+    return isDeletedByDeleteFamily(cell) || isDeletedByDeleteColumn(cell)
+      || isDeletedByDeleteFamilyVersion(cell) || 
isDeletedByDeleteColumnVersion(cell);
+  }
+
+  private void appendCell(Cell cell) throws IOException {
+    if ((lastCell == null || !CellUtil.matchingColumn(lastCell, cell))) {
+      initColumnState();
+    }
+    if (cell.getType() == Cell.Type.DeleteFamily) {
+      if (deleteFamily == null) {
+        if (cell.getType() == Cell.Type.DeleteFamily) {
+          deleteFamily = cell;
+          addLiveVersion(cell);
+        } else {
+          addHistoricalVersion(cell);
+        }
+      }

Review Comment:
   `if (cell.getType() == Cell.Type.DeleteFamily)` is repeated, which means we 
are not adding the cell to historical version.
   
   I think this was meant to be:
   ```
       if (cell.getType() == Cell.Type.DeleteFamily) {
         if (deleteFamily == null) {
           deleteFamily = cell;
           addLiveVersion(cell);
         } else {
           addHistoricalVersion(cell);
         }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to