Repository: hbase
Updated Branches:
  refs/heads/master 9e9db3245 -> 0b28155d2


HBASE-20625 refactor some WALCellCodec related code

Signed-off-by: Guanghao Zhang <zg...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0b28155d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0b28155d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0b28155d

Branch: refs/heads/master
Commit: 0b28155d274910b4e667b949d51f78809a1eff0b
Parents: 9e9db32
Author: jingyuntian <tianjy1...@gmail.com>
Authored: Thu Jun 14 10:25:24 2018 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Thu Jun 14 19:37:01 2018 +0800

----------------------------------------------------------------------
 .../hbase/protobuf/ReplicationProtbufUtil.java  | 61 ++++----------
 .../wal/AbstractProtobufLogWriter.java          |  3 +
 .../wal/AsyncProtobufLogWriter.java             |  1 -
 .../regionserver/wal/CompressionContext.java    | 54 ++++++------
 .../regionserver/wal/ProtobufLogReader.java     |  2 +
 .../regionserver/wal/ProtobufLogWriter.java     |  1 -
 .../hbase/regionserver/wal/ReaderBase.java      |  3 -
 .../wal/SecureProtobufLogReader.java            |  1 +
 .../hbase/regionserver/wal/WALCellCodec.java    | 87 ++++++++++++++------
 .../replication/ClusterMarkingEntryFilter.java  |  2 -
 .../java/org/apache/hadoop/hbase/wal/WAL.java   | 11 ---
 .../org/apache/hadoop/hbase/wal/WALKeyImpl.java | 60 +++++---------
 .../wal/FaultyProtobufLogReader.java            |  3 -
 13 files changed, 134 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0b28155d/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index 81dd59e..157ad1b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -24,29 +24,25 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.UUID;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.PrivateCellUtil;
-import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.io.SizedCellScanner;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
 import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
+import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
+
 @InterfaceAudience.Private
 public class ReplicationProtbufUtil {
   /**
@@ -81,7 +77,7 @@ public class ReplicationProtbufUtil {
    * found.
    */
   public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
-      buildReplicateWALEntryRequest(final Entry[] entries) {
+      buildReplicateWALEntryRequest(final Entry[] entries) throws IOException {
     return buildReplicateWALEntryRequest(entries, null, null, null, null);
   }
 
@@ -97,53 +93,30 @@ public class ReplicationProtbufUtil {
    */
   public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
       buildReplicateWALEntryRequest(final Entry[] entries, byte[] 
encodedRegionName,
-          String replicationClusterId, Path sourceBaseNamespaceDir, Path 
sourceHFileArchiveDir) {
+          String replicationClusterId, Path sourceBaseNamespaceDir, Path 
sourceHFileArchiveDir)
+          throws IOException {
     // Accumulate all the Cells seen in here.
     List<List<? extends Cell>> allCells = new ArrayList<>(entries.length);
     int size = 0;
-    WALProtos.FamilyScope.Builder scopeBuilder = 
WALProtos.FamilyScope.newBuilder();
     AdminProtos.WALEntry.Builder entryBuilder = 
AdminProtos.WALEntry.newBuilder();
     AdminProtos.ReplicateWALEntryRequest.Builder builder =
       AdminProtos.ReplicateWALEntryRequest.newBuilder();
-    HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
+
     for (Entry entry: entries) {
       entryBuilder.clear();
-      // TODO: this duplicates a lot in WALKeyImpl#getBuilder
-      WALProtos.WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder();
-      WALKeyImpl key = entry.getKey();
-      keyBuilder.setEncodedRegionName(
-          UnsafeByteOperations.unsafeWrap(encodedRegionName == null
-            ? key.getEncodedRegionName()
-            : encodedRegionName));
-      
keyBuilder.setTableName(UnsafeByteOperations.unsafeWrap(key.getTableName().getName()));
-      long sequenceId = key.getSequenceId();
-      keyBuilder.setLogSequenceNumber(sequenceId);
-      keyBuilder.setWriteTime(key.getWriteTime());
-      if (key.getNonce() != HConstants.NO_NONCE) {
-        keyBuilder.setNonce(key.getNonce());
+      WALProtos.WALKey.Builder keyBuilder;
+      try {
+        keyBuilder = 
entry.getKey().getBuilder(WALCellCodec.getNoneCompressor());
+      } catch (IOException e) {
+        throw new IOException(
+            "There should not throw exception since NoneCompressor do not 
throw any exceptions", e);
       }
-      if (key.getNonceGroup() != HConstants.NO_NONCE) {
-        keyBuilder.setNonceGroup(key.getNonceGroup());
-      }
-      for(UUID clusterId : key.getClusterIds()) {
-        uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
-        uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
-        keyBuilder.addClusterIds(uuidBuilder.build());
-      }
-      if (key.getOrigLogSeqNum() > 0) {
-        keyBuilder.setOrigSequenceNumber(key.getOrigLogSeqNum());
+      if(encodedRegionName != null){
+        keyBuilder.setEncodedRegionName(
+            UnsafeByteOperations.unsafeWrap(encodedRegionName));
       }
+      entryBuilder.setKey(keyBuilder.build());
       WALEdit edit = entry.getEdit();
-      NavigableMap<byte[], Integer> scopes = key.getReplicationScopes();
-      if (scopes != null && !scopes.isEmpty()) {
-        for (Map.Entry<byte[], Integer> scope: scopes.entrySet()) {
-          
scopeBuilder.setFamily(UnsafeByteOperations.unsafeWrap(scope.getKey()));
-          WALProtos.ScopeType scopeType =
-              WALProtos.ScopeType.valueOf(scope.getValue().intValue());
-          scopeBuilder.setScopeType(scopeType);
-          keyBuilder.addScopes(scopeBuilder.build());
-        }
-      }
       List<Cell> cells = edit.getCells();
       // Add up the size.  It is used later serializing out the kvs.
       for (Cell cell: cells) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/0b28155d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
index 50ac101..ae084a4 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java
@@ -183,6 +183,8 @@ public abstract class AbstractProtobufLogWriter {
     this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder());
     if (doCompress) {
       this.compressor = codec.getByteStringCompressor();
+    } else {
+      this.compressor = WALCellCodec.getNoneCompressor();
     }
   }
 
@@ -198,6 +200,7 @@ public abstract class AbstractProtobufLogWriter {
       this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder());
       // We do not support compression
       this.compressionContext = null;
+      this.compressor = WALCellCodec.getNoneCompressor();
     } else {
       initAfterHeader0(doCompress);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0b28155d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index abdc24e..6368fb7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -120,7 +120,6 @@ public class AsyncProtobufLogWriter extends 
AbstractProtobufLogWriter
   @Override
   public void append(Entry entry) {
     int buffered = output.buffered();
-    entry.setCompressionContext(compressionContext);
     try {
       entry.getKey().
         
getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build()

http://git-wip-us.apache.org/repos/asf/hbase/blob/0b28155d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
index 22ac499..16866e1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/CompressionContext.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver.wal;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.util.EnumMap;
+import java.util.Map;
 
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -35,12 +37,12 @@ public class CompressionContext {
   static final String ENABLE_WAL_TAGS_COMPRESSION =
       "hbase.regionserver.wal.tags.enablecompression";
 
-  // visible only for WALKey, until we move everything into o.a.h.h.wal
-  public final Dictionary regionDict;
-  public final Dictionary tableDict;
-  public final Dictionary familyDict;
-  final Dictionary qualifierDict;
-  final Dictionary rowDict;
+  public enum DictionaryIndex {
+    REGION, TABLE, FAMILY, QUALIFIER, ROW
+  }
+
+  private final Map<DictionaryIndex, Dictionary> dictionaries =
+      new EnumMap<>(DictionaryIndex.class);
   // Context used for compressing tags
   TagCompressionContext tagCompressionContext = null;
 
@@ -49,33 +51,35 @@ public class CompressionContext {
       InstantiationException, IllegalAccessException, 
InvocationTargetException {
     Constructor<? extends Dictionary> dictConstructor =
         dictType.getConstructor();
-    regionDict = dictConstructor.newInstance();
-    tableDict = dictConstructor.newInstance();
-    familyDict = dictConstructor.newInstance();
-    qualifierDict = dictConstructor.newInstance();
-    rowDict = dictConstructor.newInstance();
-    if (recoveredEdits) {
-      // This will never change
-      regionDict.init(1);
-      tableDict.init(1);
+    for (DictionaryIndex dictionaryIndex : DictionaryIndex.values()) {
+      Dictionary newDictionary = dictConstructor.newInstance();
+      dictionaries.put(dictionaryIndex, newDictionary);
+    }
+    if(recoveredEdits) {
+      getDictionary(DictionaryIndex.REGION).init(1);
+      getDictionary(DictionaryIndex.TABLE).init(1);
     } else {
-      regionDict.init(Short.MAX_VALUE);
-      tableDict.init(Short.MAX_VALUE);
+      getDictionary(DictionaryIndex.REGION).init(Short.MAX_VALUE);
+      getDictionary(DictionaryIndex.TABLE).init(Short.MAX_VALUE);
     }
-    rowDict.init(Short.MAX_VALUE);
-    familyDict.init(Byte.MAX_VALUE);
-    qualifierDict.init(Byte.MAX_VALUE);
+
+    getDictionary(DictionaryIndex.ROW).init(Short.MAX_VALUE);
+    getDictionary(DictionaryIndex.FAMILY).init(Byte.MAX_VALUE);
+    getDictionary(DictionaryIndex.QUALIFIER).init(Byte.MAX_VALUE);
+
     if (hasTagCompression) {
       tagCompressionContext = new TagCompressionContext(dictType, 
Short.MAX_VALUE);
     }
   }
 
+  public Dictionary getDictionary(Enum dictIndex) {
+    return dictionaries.get(dictIndex);
+  }
+
   void clear() {
-    regionDict.clear();
-    tableDict.clear();
-    familyDict.clear();
-    qualifierDict.clear();
-    rowDict.clear();
+    for(Dictionary dictionary : dictionaries.values()){
+      dictionary.clear();
+    }
     if (tagCompressionContext != null) {
       tagCompressionContext.clear();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0b28155d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
index 5d8d8c0..83398bd 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogReader.java
@@ -312,6 +312,8 @@ public class ProtobufLogReader extends ReaderBase {
     this.cellDecoder = codec.getDecoder(this.inputStream);
     if (this.hasCompression) {
       this.byteStringUncompressor = codec.getByteStringUncompressor();
+    } else {
+      this.byteStringUncompressor = WALCellCodec.getNoneUncompressor();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/0b28155d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
index 2852047..b4e2cbf 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java
@@ -47,7 +47,6 @@ public class ProtobufLogWriter extends 
AbstractProtobufLogWriter
 
   @Override
   public void append(Entry entry) throws IOException {
-    entry.setCompressionContext(compressionContext);
     entry.getKey().getBuilder(compressor).
         
setFollowingKvCount(entry.getEdit().size()).build().writeDelimitedTo(output);
     for (Cell cell : entry.getEdit().getCells()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/0b28155d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
index 4338f6d..27d40b2 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ReaderBase.java
@@ -92,9 +92,6 @@ public abstract class ReaderBase implements 
AbstractFSWALProvider.Reader {
     if (e == null) {
       e = new Entry();
     }
-    if (compressionContext != null) {
-      e.setCompressionContext(compressionContext);
-    }
 
     boolean hasEntry = false;
     try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/0b28155d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java
index b1f17ad..e43d140 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureProtobufLogReader.java
@@ -141,6 +141,7 @@ public class SecureProtobufLogReader extends 
ProtobufLogReader {
       this.cellDecoder = codec.getDecoder(this.inputStream);
       // We do not support compression with WAL encryption
       this.compressionContext = null;
+      this.byteStringUncompressor = WALCellCodec.getNoneUncompressor();
       this.hasCompression = false;
     } else {
       super.initAfterCompression(cellCodecClsName);

http://git-wip-us.apache.org/repos/asf/hbase/blob/0b28155d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
index 2922a10..34d83f7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
 import org.apache.hadoop.io.IOUtils;
 
 import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
 
 
 /**
@@ -62,12 +63,6 @@ public class WALCellCodec implements Codec {
   public static final String WAL_CELL_CODEC_CLASS_KEY = 
"hbase.regionserver.wal.codec";
 
   protected final CompressionContext compression;
-  protected final ByteStringUncompressor statelessUncompressor = new 
ByteStringUncompressor() {
-    @Override
-    public byte[] uncompress(ByteString data, Dictionary dict) throws 
IOException {
-      return WALCellCodec.uncompressByteString(data, dict);
-    }
-  };
 
   /**
    * <b>All subclasses must implement a no argument constructor</b>
@@ -132,17 +127,32 @@ public class WALCellCodec implements Codec {
   }
 
   public interface ByteStringCompressor {
-    ByteString compress(byte[] data, Dictionary dict) throws IOException;
+    ByteString compress(byte[] data, Enum dictIndex) throws IOException;
   }
 
   public interface ByteStringUncompressor {
-    byte[] uncompress(ByteString data, Dictionary dict) throws IOException;
+    byte[] uncompress(ByteString data, Enum dictIndex) throws IOException;
+  }
+
+  static class StatelessUncompressor implements ByteStringUncompressor {
+    CompressionContext compressionContext;
+
+    public StatelessUncompressor(CompressionContext compressionContext) {
+      this.compressionContext = compressionContext;
+    }
+
+    @Override
+    public byte[] uncompress(ByteString data, Enum dictIndex) throws 
IOException {
+      return WALCellCodec.uncompressByteString(data, 
compressionContext.getDictionary(dictIndex));
+    }
   }
 
-  // TODO: it sucks that compression context is in WAL.Entry. It'd be nice if 
it was here.
-  //       Dictionary could be gotten by enum; initially, based on enum, 
context would create
-  //       an array of dictionaries.
   static class BaosAndCompressor extends ByteArrayOutputStream implements 
ByteStringCompressor {
+    private CompressionContext compressionContext;
+
+    public BaosAndCompressor(CompressionContext compressionContext) {
+      this.compressionContext = compressionContext;
+    }
     public ByteString toByteString() {
       // We need this copy to create the ByteString as the byte[] 'buf' is not 
immutable. We reuse
       // them.
@@ -150,8 +160,8 @@ public class WALCellCodec implements Codec {
     }
 
     @Override
-    public ByteString compress(byte[] data, Dictionary dict) throws 
IOException {
-      writeCompressed(data, dict);
+    public ByteString compress(byte[] data, Enum dictIndex) throws IOException 
{
+      writeCompressed(data, dictIndex);
       // We need this copy to create the ByteString as the byte[] 'buf' is not 
immutable. We reuse
       // them.
       ByteString result = ByteString.copyFrom(this.buf, 0, this.count);
@@ -159,7 +169,8 @@ public class WALCellCodec implements Codec {
       return result;
     }
 
-    private void writeCompressed(byte[] data, Dictionary dict) throws 
IOException {
+    private void writeCompressed(byte[] data, Enum dictIndex) throws 
IOException {
+      Dictionary dict = compressionContext.getDictionary(dictIndex);
       assert dict != null;
       short dictIdx = dict.findEntry(data, 0, data.length);
       if (dictIdx == Dictionary.NOT_IN_DICTIONARY) {
@@ -172,6 +183,22 @@ public class WALCellCodec implements Codec {
     }
   }
 
+  static class NoneCompressor implements ByteStringCompressor {
+
+    @Override
+    public ByteString compress(byte[] data, Enum dictIndex) {
+      return UnsafeByteOperations.unsafeWrap(data);
+    }
+  }
+
+  static class NoneUncompressor implements ByteStringUncompressor {
+
+    @Override
+    public byte[] uncompress(ByteString data, Enum dictIndex) {
+      return data.toByteArray();
+    }
+  }
+
   private static byte[] uncompressByteString(ByteString bs, Dictionary dict) 
throws IOException {
     InputStream in = bs.newInput();
     byte status = (byte)in.read();
@@ -209,9 +236,12 @@ public class WALCellCodec implements Codec {
       // To support tags
       int tagsLength = cell.getTagsLength();
       StreamUtils.writeRawVInt32(out, tagsLength);
-      PrivateCellUtil.compressRow(out, cell, compression.rowDict);
-      PrivateCellUtil.compressFamily(out, cell, compression.familyDict);
-      PrivateCellUtil.compressQualifier(out, cell, compression.qualifierDict);
+      PrivateCellUtil.compressRow(out, cell,
+        compression.getDictionary(CompressionContext.DictionaryIndex.ROW));
+      PrivateCellUtil.compressFamily(out, cell,
+        compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY));
+      PrivateCellUtil.compressQualifier(out, cell,
+        
compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER));
       // Write timestamp, type and value as uncompressed.
       StreamUtils.writeLong(out, cell.getTimestamp());
       out.write(cell.getTypeByte());
@@ -255,19 +285,22 @@ public class WALCellCodec implements Codec {
       pos = Bytes.putInt(backingArray, pos, vlength);
 
       // the row
-      int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT, 
compression.rowDict);
+      int elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_SHORT,
+        compression.getDictionary(CompressionContext.DictionaryIndex.ROW));
       checkLength(elemLen, Short.MAX_VALUE);
       pos = Bytes.putShort(backingArray, pos, (short)elemLen);
       pos += elemLen;
 
       // family
-      elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE, 
compression.familyDict);
+      elemLen = readIntoArray(backingArray, pos + Bytes.SIZEOF_BYTE,
+        compression.getDictionary(CompressionContext.DictionaryIndex.FAMILY));
       checkLength(elemLen, Byte.MAX_VALUE);
       pos = Bytes.putByte(backingArray, pos, (byte)elemLen);
       pos += elemLen;
 
       // qualifier
-      elemLen = readIntoArray(backingArray, pos, compression.qualifierDict);
+      elemLen = readIntoArray(backingArray, pos,
+        
compression.getDictionary(CompressionContext.DictionaryIndex.QUALIFIER));
       pos += elemLen;
 
       // timestamp, type and value
@@ -354,12 +387,18 @@ public class WALCellCodec implements Codec {
   }
 
   public ByteStringCompressor getByteStringCompressor() {
-    // TODO: ideally this should also encapsulate compressionContext
-    return new BaosAndCompressor();
+    return new BaosAndCompressor(compression);
   }
 
   public ByteStringUncompressor getByteStringUncompressor() {
-    // TODO: ideally this should also encapsulate compressionContext
-    return this.statelessUncompressor;
+    return new StatelessUncompressor(compression);
+  }
+
+  public static ByteStringCompressor getNoneCompressor() {
+    return new NoneCompressor();
+  }
+
+  public static ByteStringUncompressor getNoneUncompressor() {
+    return new NoneUncompressor();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0b28155d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java
index 6dc5001..5f92bbf 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ClusterMarkingEntryFilter.java
@@ -60,8 +60,6 @@ public class ClusterMarkingEntryFilter implements 
WALEntryFilter {
       if (edit != null && !edit.isEmpty()) {
         // Mark that the current cluster has the change
         logKey.addClusterId(clusterId);
-        // We need to set the CC to null else it will be compressed when sent 
to the sink
-        entry.setCompressionContext(null);
         return entry;
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0b28155d/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
index 3c85737..cf367cd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
@@ -23,7 +23,6 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
 import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
@@ -278,16 +277,6 @@ public interface WAL extends Closeable, 
WALFileLengthProvider {
       return key;
     }
 
-    /**
-     * Set compression context for this entry.
-     *
-     * @param compressionContext
-     *          Compression context
-     */
-    public void setCompressionContext(CompressionContext compressionContext) {
-      key.setCompressionContext(compressionContext);
-    }
-
     @Override
     public String toString() {
       return this.key + "=" + this.edit;

http://git-wip-us.apache.org/repos/asf/hbase/blob/0b28155d/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
index 8828239..71cbaf4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
@@ -37,7 +37,6 @@ import org.apache.yetus.audience.InterfaceAudience;
 
 import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
-import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
@@ -117,8 +116,6 @@ public class WALKeyImpl implements WALKey {
    */
   private MultiVersionConcurrencyControl.WriteEntry writeEntry;
 
-  private CompressionContext compressionContext;
-
   public WALKeyImpl() {
     init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
         new ArrayList<>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null, 
null);
@@ -332,13 +329,6 @@ public class WALKeyImpl implements WALKey {
     this.sequenceId = sequenceId;
   }
 
-  /**
-   * @param compressionContext Compression context to use
-   */
-  public void setCompressionContext(CompressionContext compressionContext) {
-    this.compressionContext = compressionContext;
-  }
-
   /** @return encoded region name */
   @Override
   public byte [] getEncodedRegionName() {
@@ -517,18 +507,13 @@ public class WALKeyImpl implements WALKey {
     this.encodedRegionName = encodedRegionName;
   }
 
-  public WALProtos.WALKey.Builder getBuilder(
-      WALCellCodec.ByteStringCompressor compressor) throws IOException {
+  public WALProtos.WALKey.Builder getBuilder(WALCellCodec.ByteStringCompressor 
compressor)
+      throws IOException {
     WALProtos.WALKey.Builder builder = WALProtos.WALKey.newBuilder();
-    if (compressionContext == null) {
-      
builder.setEncodedRegionName(UnsafeByteOperations.unsafeWrap(this.encodedRegionName));
-      
builder.setTableName(UnsafeByteOperations.unsafeWrap(this.tablename.getName()));
-    } else {
-      builder.setEncodedRegionName(compressor.compress(this.encodedRegionName,
-          compressionContext.regionDict));
-      builder.setTableName(compressor.compress(this.tablename.getName(),
-          compressionContext.tableDict));
-    }
+    builder.setEncodedRegionName(
+      compressor.compress(this.encodedRegionName, 
CompressionContext.DictionaryIndex.REGION));
+    builder.setTableName(
+      compressor.compress(this.tablename.getName(), 
CompressionContext.DictionaryIndex.TABLE));
     builder.setLogSequenceNumber(getSequenceId());
     builder.setWriteTime(writeTime);
     if (this.origLogSeqNum > 0) {
@@ -548,29 +533,22 @@ public class WALKeyImpl implements WALKey {
     }
     if (replicationScope != null) {
       for (Map.Entry<byte[], Integer> e : replicationScope.entrySet()) {
-        ByteString family = (compressionContext == null)
-            ? UnsafeByteOperations.unsafeWrap(e.getKey())
-            : compressor.compress(e.getKey(), compressionContext.familyDict);
-        builder.addScopes(FamilyScope.newBuilder()
-            
.setFamily(family).setScopeType(ScopeType.forNumber(e.getValue())));
+        ByteString family =
+            compressor.compress(e.getKey(), 
CompressionContext.DictionaryIndex.FAMILY);
+        builder.addScopes(FamilyScope.newBuilder().setFamily(family)
+            .setScopeType(ScopeType.forNumber(e.getValue())));
       }
     }
     return builder;
   }
 
   public void readFieldsFromPb(WALProtos.WALKey walKey,
-                               WALCellCodec.ByteStringUncompressor 
uncompressor)
-      throws IOException {
-    if (this.compressionContext != null) {
-      this.encodedRegionName = uncompressor.uncompress(
-          walKey.getEncodedRegionName(), compressionContext.regionDict);
-      byte[] tablenameBytes = uncompressor.uncompress(
-          walKey.getTableName(), compressionContext.tableDict);
-      this.tablename = TableName.valueOf(tablenameBytes);
-    } else {
-      this.encodedRegionName = walKey.getEncodedRegionName().toByteArray();
-      this.tablename = TableName.valueOf(walKey.getTableName().toByteArray());
-    }
+      WALCellCodec.ByteStringUncompressor uncompressor) throws IOException {
+    this.encodedRegionName = 
uncompressor.uncompress(walKey.getEncodedRegionName(),
+      CompressionContext.DictionaryIndex.REGION);
+    byte[] tablenameBytes =
+        uncompressor.uncompress(walKey.getTableName(), 
CompressionContext.DictionaryIndex.TABLE);
+    this.tablename = TableName.valueOf(tablenameBytes);
     clusterIds.clear();
     for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) {
       clusterIds.add(new UUID(clusterId.getMostSigBits(), 
clusterId.getLeastSigBits()));
@@ -585,14 +563,14 @@ public class WALKeyImpl implements WALKey {
     if (walKey.getScopesCount() > 0) {
       this.replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
       for (FamilyScope scope : walKey.getScopesList()) {
-        byte[] family = (compressionContext == null) ? 
scope.getFamily().toByteArray() :
-          uncompressor.uncompress(scope.getFamily(), 
compressionContext.familyDict);
+        byte[] family =
+            uncompressor.uncompress(scope.getFamily(), 
CompressionContext.DictionaryIndex.FAMILY);
         this.replicationScope.put(family, scope.getScopeType().getNumber());
       }
     }
     setSequenceId(walKey.getLogSequenceNumber());
     this.writeTime = walKey.getWriteTime();
-    if(walKey.hasOrigSequenceNumber()) {
+    if (walKey.hasOrigSequenceNumber()) {
       this.origLogSeqNum = walKey.getOrigSequenceNumber();
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0b28155d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufLogReader.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufLogReader.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufLogReader.java
index f1508e5..2c74f80 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufLogReader.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/FaultyProtobufLogReader.java
@@ -45,9 +45,6 @@ public class FaultyProtobufLogReader extends 
ProtobufLogReader {
       boolean b;
       do {
         Entry e = new Entry();
-        if (compressionContext != null) {
-          e.setCompressionContext(compressionContext);
-        }
         b = readNext(e);
         nextQueue.offer(e);
         numberOfFileEntries++;

Reply via email to