This is an automated email from the ASF dual-hosted git repository.

achouhan pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new f4881dc  HBASE-22330 Backport HBASE-20724 (Sometimes some compacted 
storefiles are still opened after region failover) to branch-1
f4881dc is described below

commit f4881dcf1b414d7537f0f70bd2fb24b9e4d281c1
Author: Abhishek Singh Chouhan <[email protected]>
AuthorDate: Mon May 6 17:16:09 2019 -0700

    HBASE-22330 Backport HBASE-20724 (Sometimes some compacted storefiles are 
still opened after region failover) to branch-1
---
 .../apache/hadoop/hbase/protobuf/ProtobufUtil.java |  31 ++
 .../hbase/protobuf/generated/HFileProtos.java      | 543 ++++++++++++++++++++-
 hbase-protocol/src/main/protobuf/HFile.proto       |   4 +
 .../regionserver/AbstractMultiFileWriter.java      |  12 +-
 .../apache/hadoop/hbase/regionserver/HStore.java   |  61 +--
 .../hadoop/hbase/regionserver/StoreFile.java       |  92 +++-
 .../compactions/DateTieredCompactor.java           |   2 +-
 .../regionserver/compactions/DefaultCompactor.java |   2 +-
 .../regionserver/compactions/StripeCompactor.java  |   2 +-
 .../TestCleanupCompactedFileAfterFailover.java     | 191 ++++++++
 .../TestCleanupCompactedFileOnRegionClose.java     |  49 --
 .../regionserver/compactions/TestCompactor.java    |   8 +
 12 files changed, 880 insertions(+), 117 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 36e495d..2c70ad7 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -48,11 +48,14 @@ import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableSet;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
@@ -137,6 +140,7 @@ import 
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
 import 
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.protobuf.generated.HFileProtos;
 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
 import 
org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
@@ -3578,4 +3582,31 @@ public final class ProtobufUtil {
     return new TimeRange(minStamp, maxStamp);
   }
 
+  public static byte[] toCompactionEventTrackerBytes(Set<String> storeFiles) {
+    HFileProtos.CompactionEventTracker.Builder builder =
+        HFileProtos.CompactionEventTracker.newBuilder();
+    for (String sf : storeFiles) {
+      builder.addCompactedStoreFile(ByteString.copyFromUtf8(sf));
+    }
+    return ProtobufUtil.prependPBMagic(builder.build().toByteArray());
+  }
+
+  public static Set<String> toCompactedStoreFiles(byte[] bytes) throws 
IOException {
+    if (bytes != null && ProtobufUtil.isPBMagicPrefix(bytes)) {
+      int pbLen = ProtobufUtil.lengthOfPBMagic();
+      HFileProtos.CompactionEventTracker.Builder builder =
+          HFileProtos.CompactionEventTracker.newBuilder();
+      ProtobufUtil.mergeFrom(builder, bytes, pbLen, bytes.length - pbLen);
+      HFileProtos.CompactionEventTracker compactionEventTracker = 
builder.build();
+      List<ByteString> compactedStoreFiles = 
compactionEventTracker.getCompactedStoreFileList();
+      if (compactedStoreFiles != null && compactedStoreFiles.size() != 0) {
+        Set<String> compactedStoreFileSet = new HashSet<>();
+        for (ByteString sf : compactedStoreFiles) {
+          compactedStoreFileSet.add(sf.toStringUtf8());
+        }
+        return compactedStoreFileSet;
+      }
+    }
+    return Collections.emptySet();
+  }
 }
diff --git 
a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HFileProtos.java
 
b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HFileProtos.java
index 5b6f2f4..7f146b7 100644
--- 
a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HFileProtos.java
+++ 
b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/HFileProtos.java
@@ -8,6 +8,503 @@ public final class HFileProtos {
   public static void registerAllExtensions(
       com.google.protobuf.ExtensionRegistry registry) {
   }
+  public interface CompactionEventTrackerOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+
+    // repeated bytes compacted_store_file = 1;
+    /**
+     * <code>repeated bytes compacted_store_file = 1;</code>
+     */
+    java.util.List<com.google.protobuf.ByteString> getCompactedStoreFileList();
+    /**
+     * <code>repeated bytes compacted_store_file = 1;</code>
+     */
+    int getCompactedStoreFileCount();
+    /**
+     * <code>repeated bytes compacted_store_file = 1;</code>
+     */
+    com.google.protobuf.ByteString getCompactedStoreFile(int index);
+  }
+  /**
+   * Protobuf type {@code hbase.pb.CompactionEventTracker}
+   */
+  public static final class CompactionEventTracker extends
+      com.google.protobuf.GeneratedMessage
+      implements CompactionEventTrackerOrBuilder {
+    // Use CompactionEventTracker.newBuilder() to construct.
+    private 
CompactionEventTracker(com.google.protobuf.GeneratedMessage.Builder<?> builder) 
{
+      super(builder);
+      this.unknownFields = builder.getUnknownFields();
+    }
+    private CompactionEventTracker(boolean noInit) { this.unknownFields = 
com.google.protobuf.UnknownFieldSet.getDefaultInstance(); }
+
+    private static final CompactionEventTracker defaultInstance;
+    public static CompactionEventTracker getDefaultInstance() {
+      return defaultInstance;
+    }
+
+    public CompactionEventTracker getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+
+    private final com.google.protobuf.UnknownFieldSet unknownFields;
+    @java.lang.Override
+    public final com.google.protobuf.UnknownFieldSet
+        getUnknownFields() {
+      return this.unknownFields;
+    }
+    private CompactionEventTracker(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      initFields();
+      int mutable_bitField0_ = 0;
+      com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder();
+      try {
+        boolean done = false;
+        while (!done) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              done = true;
+              break;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                done = true;
+              }
+              break;
+            }
+            case 10: {
+              if (!((mutable_bitField0_ & 0x00000001) == 0x00000001)) {
+                compactedStoreFile_ = new 
java.util.ArrayList<com.google.protobuf.ByteString>();
+                mutable_bitField0_ |= 0x00000001;
+              }
+              compactedStoreFile_.add(input.readBytes());
+              break;
+            }
+          }
+        }
+      } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+        throw e.setUnfinishedMessage(this);
+      } catch (java.io.IOException e) {
+        throw new com.google.protobuf.InvalidProtocolBufferException(
+            e.getMessage()).setUnfinishedMessage(this);
+      } finally {
+        if (((mutable_bitField0_ & 0x00000001) == 0x00000001)) {
+          compactedStoreFile_ = 
java.util.Collections.unmodifiableList(compactedStoreFile_);
+        }
+        this.unknownFields = unknownFields.build();
+        makeExtensionsImmutable();
+      }
+    }
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.internal_static_hbase_pb_CompactionEventTracker_descriptor;
+    }
+
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.internal_static_hbase_pb_CompactionEventTracker_fieldAccessorTable
+          .ensureFieldAccessorsInitialized(
+              
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker.class,
 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker.Builder.class);
+    }
+
+    public static com.google.protobuf.Parser<CompactionEventTracker> PARSER =
+        new com.google.protobuf.AbstractParser<CompactionEventTracker>() {
+      public CompactionEventTracker parsePartialFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        return new CompactionEventTracker(input, extensionRegistry);
+      }
+    };
+
+    @java.lang.Override
+    public com.google.protobuf.Parser<CompactionEventTracker> 
getParserForType() {
+      return PARSER;
+    }
+
+    // repeated bytes compacted_store_file = 1;
+    public static final int COMPACTED_STORE_FILE_FIELD_NUMBER = 1;
+    private java.util.List<com.google.protobuf.ByteString> compactedStoreFile_;
+    /**
+     * <code>repeated bytes compacted_store_file = 1;</code>
+     */
+    public java.util.List<com.google.protobuf.ByteString>
+        getCompactedStoreFileList() {
+      return compactedStoreFile_;
+    }
+    /**
+     * <code>repeated bytes compacted_store_file = 1;</code>
+     */
+    public int getCompactedStoreFileCount() {
+      return compactedStoreFile_.size();
+    }
+    /**
+     * <code>repeated bytes compacted_store_file = 1;</code>
+     */
+    public com.google.protobuf.ByteString getCompactedStoreFile(int index) {
+      return compactedStoreFile_.get(index);
+    }
+
+    private void initFields() {
+      compactedStoreFile_ = java.util.Collections.emptyList();
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+
+      memoizedIsInitialized = 1;
+      return true;
+    }
+
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      for (int i = 0; i < compactedStoreFile_.size(); i++) {
+        output.writeBytes(1, compactedStoreFile_.get(i));
+      }
+      getUnknownFields().writeTo(output);
+    }
+
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+
+      size = 0;
+      {
+        int dataSize = 0;
+        for (int i = 0; i < compactedStoreFile_.size(); i++) {
+          dataSize += com.google.protobuf.CodedOutputStream
+            .computeBytesSizeNoTag(compactedStoreFile_.get(i));
+        }
+        size += dataSize;
+        size += 1 * getCompactedStoreFileList().size();
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+
+    @java.lang.Override
+    public boolean equals(final java.lang.Object obj) {
+      if (obj == this) {
+       return true;
+      }
+      if (!(obj instanceof 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker)) 
{
+        return super.equals(obj);
+      }
+      
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker 
other = 
(org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker) 
obj;
+
+      boolean result = true;
+      result = result && getCompactedStoreFileList()
+          .equals(other.getCompactedStoreFileList());
+      result = result &&
+          getUnknownFields().equals(other.getUnknownFields());
+      return result;
+    }
+
+    private int memoizedHashCode = 0;
+    @java.lang.Override
+    public int hashCode() {
+      if (memoizedHashCode != 0) {
+        return memoizedHashCode;
+      }
+      int hash = 41;
+      hash = (19 * hash) + getDescriptorForType().hashCode();
+      if (getCompactedStoreFileCount() > 0) {
+        hash = (37 * hash) + COMPACTED_STORE_FILE_FIELD_NUMBER;
+        hash = (53 * hash) + getCompactedStoreFileList().hashCode();
+      }
+      hash = (29 * hash) + getUnknownFields().hashCode();
+      memoizedHashCode = hash;
+      return hash;
+    }
+
+    public static 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker 
parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker 
parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker 
parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data);
+    }
+    public static 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker 
parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return PARSER.parseFrom(data, extensionRegistry);
+    }
+    public static 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker 
parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker 
parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+    public static 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker 
parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input);
+    }
+    public static 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker 
parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseDelimitedFrom(input, extensionRegistry);
+    }
+    public static 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker 
parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input);
+    }
+    public static 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker 
parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return PARSER.parseFrom(input, extensionRegistry);
+    }
+
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker
 prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    /**
+     * Protobuf type {@code hbase.pb.CompactionEventTracker}
+     */
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTrackerOrBuilder
 {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.internal_static_hbase_pb_CompactionEventTracker_descriptor;
+      }
+
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.internal_static_hbase_pb_CompactionEventTracker_fieldAccessorTable
+            .ensureFieldAccessorsInitialized(
+                
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker.class,
 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker.Builder.class);
+      }
+
+      // Construct using 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+
+      private Builder(
+          com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+
+      public Builder clear() {
+        super.clear();
+        compactedStoreFile_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000001);
+        return this;
+      }
+
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.internal_static_hbase_pb_CompactionEventTracker_descriptor;
+      }
+
+      public 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker 
getDefaultInstanceForType() {
+        return 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker.getDefaultInstance();
+      }
+
+      public 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker 
build() {
+        
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker 
result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+
+      public 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker 
buildPartial() {
+        
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker 
result = new 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker(this);
+        int from_bitField0_ = bitField0_;
+        if (((bitField0_ & 0x00000001) == 0x00000001)) {
+          compactedStoreFile_ = 
java.util.Collections.unmodifiableList(compactedStoreFile_);
+          bitField0_ = (bitField0_ & ~0x00000001);
+        }
+        result.compactedStoreFile_ = compactedStoreFile_;
+        onBuilt();
+        return result;
+      }
+
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker) {
+          return 
mergeFrom((org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+
+      public Builder 
mergeFrom(org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker
 other) {
+        if (other == 
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker.getDefaultInstance())
 return this;
+        if (!other.compactedStoreFile_.isEmpty()) {
+          if (compactedStoreFile_.isEmpty()) {
+            compactedStoreFile_ = other.compactedStoreFile_;
+            bitField0_ = (bitField0_ & ~0x00000001);
+          } else {
+            ensureCompactedStoreFileIsMutable();
+            compactedStoreFile_.addAll(other.compactedStoreFile_);
+          }
+          onChanged();
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+
+      public final boolean isInitialized() {
+        return true;
+      }
+
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        
org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker 
parsedMessage = null;
+        try {
+          parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
+        } catch (com.google.protobuf.InvalidProtocolBufferException e) {
+          parsedMessage = 
(org.apache.hadoop.hbase.protobuf.generated.HFileProtos.CompactionEventTracker) 
e.getUnfinishedMessage();
+          throw e;
+        } finally {
+          if (parsedMessage != null) {
+            mergeFrom(parsedMessage);
+          }
+        }
+        return this;
+      }
+      private int bitField0_;
+
+      // repeated bytes compacted_store_file = 1;
+      private java.util.List<com.google.protobuf.ByteString> 
compactedStoreFile_ = java.util.Collections.emptyList();
+      private void ensureCompactedStoreFileIsMutable() {
+        if (!((bitField0_ & 0x00000001) == 0x00000001)) {
+          compactedStoreFile_ = new 
java.util.ArrayList<com.google.protobuf.ByteString>(compactedStoreFile_);
+          bitField0_ |= 0x00000001;
+         }
+      }
+      /**
+       * <code>repeated bytes compacted_store_file = 1;</code>
+       */
+      public java.util.List<com.google.protobuf.ByteString>
+          getCompactedStoreFileList() {
+        return java.util.Collections.unmodifiableList(compactedStoreFile_);
+      }
+      /**
+       * <code>repeated bytes compacted_store_file = 1;</code>
+       */
+      public int getCompactedStoreFileCount() {
+        return compactedStoreFile_.size();
+      }
+      /**
+       * <code>repeated bytes compacted_store_file = 1;</code>
+       */
+      public com.google.protobuf.ByteString getCompactedStoreFile(int index) {
+        return compactedStoreFile_.get(index);
+      }
+      /**
+       * <code>repeated bytes compacted_store_file = 1;</code>
+       */
+      public Builder setCompactedStoreFile(
+          int index, com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureCompactedStoreFileIsMutable();
+        compactedStoreFile_.set(index, value);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated bytes compacted_store_file = 1;</code>
+       */
+      public Builder addCompactedStoreFile(com.google.protobuf.ByteString 
value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureCompactedStoreFileIsMutable();
+        compactedStoreFile_.add(value);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated bytes compacted_store_file = 1;</code>
+       */
+      public Builder addAllCompactedStoreFile(
+          java.lang.Iterable<? extends com.google.protobuf.ByteString> values) 
{
+        ensureCompactedStoreFileIsMutable();
+        super.addAll(values, compactedStoreFile_);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated bytes compacted_store_file = 1;</code>
+       */
+      public Builder clearCompactedStoreFile() {
+        compactedStoreFile_ = java.util.Collections.emptyList();
+        bitField0_ = (bitField0_ & ~0x00000001);
+        onChanged();
+        return this;
+      }
+
+      // 
@@protoc_insertion_point(builder_scope:hbase.pb.CompactionEventTracker)
+    }
+
+    static {
+      defaultInstance = new CompactionEventTracker(true);
+      defaultInstance.initFields();
+    }
+
+    // @@protoc_insertion_point(class_scope:hbase.pb.CompactionEventTracker)
+  }
+
   public interface FileInfoProtoOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
 
@@ -2338,6 +2835,11 @@ public final class HFileProtos {
   }
 
   private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_hbase_pb_CompactionEventTracker_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_hbase_pb_CompactionEventTracker_fieldAccessorTable;
+  private static com.google.protobuf.Descriptors.Descriptor
     internal_static_hbase_pb_FileInfoProto_descriptor;
   private static
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
@@ -2356,35 +2858,42 @@ public final class HFileProtos {
       descriptor;
   static {
     java.lang.String[] descriptorData = {
-      "\n\013HFile.proto\022\010hbase.pb\032\013HBase.proto\"<\n\r" +
-      "FileInfoProto\022+\n\tmap_entry\030\001 \003(\0132\030.hbase" +
-      ".pb.BytesBytesPair\"\221\003\n\020FileTrailerProto\022" +
-      "\030\n\020file_info_offset\030\001 \001(\004\022 \n\030load_on_ope" +
-      "n_data_offset\030\002 \001(\004\022$\n\034uncompressed_data" +
-      "_index_size\030\003 \001(\004\022 \n\030total_uncompressed_" +
-      "bytes\030\004 \001(\004\022\030\n\020data_index_count\030\005 
\001(\r\022\030\n" +
-      "\020meta_index_count\030\006 \001(\r\022\023\n\013entry_count\030\007" +
-      " \001(\004\022\035\n\025num_data_index_levels\030\010 
\001(\r\022\037\n\027f" +
-      "irst_data_block_offset\030\t \001(\004\022\036\n\026last_dat",
-      "a_block_offset\030\n \001(\004\022\035\n\025comparator_class" +
-      "_name\030\013 \001(\t\022\031\n\021compression_codec\030\014 
\001(\r\022\026" +
-      "\n\016encryption_key\030\r \001(\014BA\n*org.apache.had" +
-      "oop.hbase.protobuf.generatedB\013HFileProto" +
-      "sH\001\210\001\001\240\001\001"
+      "\n\013HFile.proto\022\010hbase.pb\032\013HBase.proto\"6\n\026" +
+      "CompactionEventTracker\022\034\n\024compacted_stor" +
+      "e_file\030\001 \003(\014\"<\n\rFileInfoProto\022+\n\tmap_ent" +
+      "ry\030\001 \003(\0132\030.hbase.pb.BytesBytesPair\"\221\003\n\020F" +
+      "ileTrailerProto\022\030\n\020file_info_offset\030\001 \001(" +
+      "\004\022 \n\030load_on_open_data_offset\030\002 \001(\004\022$\n\034u" +
+      "ncompressed_data_index_size\030\003 \001(\004\022 \n\030tot" +
+      "al_uncompressed_bytes\030\004 \001(\004\022\030\n\020data_inde" +
+      "x_count\030\005 \001(\r\022\030\n\020meta_index_count\030\006 
\001(\r\022" +
+      "\023\n\013entry_count\030\007 \001(\004\022\035\n\025num_data_index_l",
+      "evels\030\010 \001(\r\022\037\n\027first_data_block_offset\030\t" +
+      " \001(\004\022\036\n\026last_data_block_offset\030\n 
\001(\004\022\035\n\025" +
+      "comparator_class_name\030\013 \001(\t\022\031\n\021compressi" +
+      "on_codec\030\014 \001(\r\022\026\n\016encryption_key\030\r \001(\014BA" 
+
+      "\n*org.apache.hadoop.hbase.protobuf.gener" +
+      "atedB\013HFileProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
         public com.google.protobuf.ExtensionRegistry assignDescriptors(
             com.google.protobuf.Descriptors.FileDescriptor root) {
           descriptor = root;
-          internal_static_hbase_pb_FileInfoProto_descriptor =
+          internal_static_hbase_pb_CompactionEventTracker_descriptor =
             getDescriptor().getMessageTypes().get(0);
+          internal_static_hbase_pb_CompactionEventTracker_fieldAccessorTable = 
new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_hbase_pb_CompactionEventTracker_descriptor,
+              new java.lang.String[] { "CompactedStoreFile", });
+          internal_static_hbase_pb_FileInfoProto_descriptor =
+            getDescriptor().getMessageTypes().get(1);
           internal_static_hbase_pb_FileInfoProto_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_FileInfoProto_descriptor,
               new java.lang.String[] { "MapEntry", });
           internal_static_hbase_pb_FileTrailerProto_descriptor =
-            getDescriptor().getMessageTypes().get(1);
+            getDescriptor().getMessageTypes().get(2);
           internal_static_hbase_pb_FileTrailerProto_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_FileTrailerProto_descriptor,
diff --git a/hbase-protocol/src/main/protobuf/HFile.proto 
b/hbase-protocol/src/main/protobuf/HFile.proto
index 5c5e4f3..6988331 100644
--- a/hbase-protocol/src/main/protobuf/HFile.proto
+++ b/hbase-protocol/src/main/protobuf/HFile.proto
@@ -26,6 +26,10 @@ option optimize_for = SPEED;
 
 import "HBase.proto";
 
+message CompactionEventTracker {
+  repeated bytes compacted_store_file = 1;
+}
+
 // Map of name/values
 message FileInfoProto {
   repeated BytesBytesPair map_entry = 1;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
index 4987c59..eade5ec 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMultiFileWriter.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -65,18 +66,23 @@ public abstract class AbstractMultiFileWriter implements 
CellSink {
    * comments in HBASE-15400 for more details.
    */
   public List<Path> commitWriters(long maxSeqId, boolean majorCompaction) 
throws IOException {
+    return commitWriters(maxSeqId, majorCompaction, 
Collections.<StoreFile>emptySet());
+  }
+
+  public List<Path> commitWriters(long maxSeqId, boolean majorCompaction,
+      Collection<StoreFile> storeFiles) throws IOException {
     preCommitWriters();
     Collection<StoreFile.Writer> writers = this.writers();
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Commit " + writers.size() + " writers, maxSeqId=" + maxSeqId
-          + ", majorCompaction=" + majorCompaction);
+      LOG.debug("Commit " + writers.size() + " writers, maxSeqId=" + maxSeqId 
+ ", majorCompaction="
+          + majorCompaction);
     }
     List<Path> paths = new ArrayList<Path>();
     for (Writer writer : writers) {
       if (writer == null) {
         continue;
       }
-      writer.appendMetadata(maxSeqId, majorCompaction);
+      writer.appendMetadata(maxSeqId, majorCompaction, storeFiles);
       preCloseWriter(writer);
       paths.add(writer.getPath());
       writer.close();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 6f8c4cd..0dce438 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -565,6 +565,7 @@ public class HStore implements Store {
       totalValidStoreFile++;
     }
 
+    Set<String> compactedStoreFiles = new HashSet<>();
     ArrayList<StoreFile> results = new ArrayList<StoreFile>(files.size());
     IOException ioe = null;
     try {
@@ -577,6 +578,7 @@ public class HStore implements Store {
               LOG.debug("loaded " + storeFile.toStringDetailed());
             }
             results.add(storeFile);
+            compactedStoreFiles.addAll(storeFile.getCompactedStoreFiles());
           }
         } catch (InterruptedException e) {
           if (ioe == null) ioe = new InterruptedIOException(e.getMessage());
@@ -601,6 +603,21 @@ public class HStore implements Store {
       throw ioe;
     }
 
+    // Remove the compacted files from result
+    List<StoreFile> filesToRemove = new 
ArrayList<>(compactedStoreFiles.size());
+    for (StoreFile storeFile : results) {
+      if (compactedStoreFiles.contains(storeFile.getPath().getName())) {
+        LOG.warn("Clearing the compacted storefile " + storeFile + " from this 
store");
+        storeFile.getReader().close(true);
+        filesToRemove.add(storeFile);
+      }
+    }
+    results.removeAll(filesToRemove);
+    if (!filesToRemove.isEmpty() && this.isPrimaryReplicaStore()) {
+      LOG.debug("Moving the files " + filesToRemove + " to archive");
+      this.fs.removeStoreFiles(this.getColumnFamilyName(), filesToRemove);
+    }
+
     return results;
   }
 
@@ -907,7 +924,7 @@ public class HStore implements Store {
           storeEngine.getStoreFileManager().clearCompactedFiles();
       // clear the compacted files
       if (compactedfiles != null && !compactedfiles.isEmpty()) {
-        removeCompactedfiles(compactedfiles, true);
+        removeCompactedfiles(compactedfiles);
       }
       if (!result.isEmpty()) {
         // initialize the thread pool for closing store files in parallel.
@@ -1116,7 +1133,8 @@ public class HStore implements Store {
             .withMaxKeyCount(maxKeyCount)
             .withFavoredNodes(favoredNodes)
             .withFileContext(hFileContext)
-            .withShouldDropCacheBehind(shouldDropBehind);
+            .withShouldDropCacheBehind(shouldDropBehind)
+            .withCompactedFiles(this.getCompactedfiles());
     if (trt != null) {
       builder.withTimeRangeTracker(trt);
     }
@@ -2721,11 +2739,6 @@ public class HStore implements Store {
 
   @Override
   public synchronized void closeAndArchiveCompactedFiles() throws IOException {
-    closeAndArchiveCompactedFiles(false);
-  }
-
-  @VisibleForTesting
-  public synchronized void closeAndArchiveCompactedFiles(boolean storeClosing) 
throws IOException {
     // ensure other threads do not attempt to archive the same files on close()
     archiveLock.lock();
     try {
@@ -2746,7 +2759,7 @@ public class HStore implements Store {
         lock.readLock().unlock();
       }
       if (copyCompactedfiles != null && !copyCompactedfiles.isEmpty()) {
-        removeCompactedfiles(copyCompactedfiles, storeClosing);
+        removeCompactedfiles(copyCompactedfiles);
       }
     } finally {
       archiveLock.unlock();
@@ -2758,7 +2771,7 @@ public class HStore implements Store {
    * @param compactedfiles The compacted files in this store that are not 
active in reads
    * @throws IOException
    */
-  private void removeCompactedfiles(Collection<StoreFile> compactedfiles, 
boolean storeClosing)
+  private void removeCompactedfiles(Collection<StoreFile> compactedfiles)
       throws IOException {
     final List<StoreFile> filesToRemove = new 
ArrayList<StoreFile>(compactedfiles.size());
     for (final StoreFile file : compactedfiles) {
@@ -2766,16 +2779,6 @@ public class HStore implements Store {
         try {
           StoreFile.Reader r = file.getReader();
 
-          //Compacted files in the list should always be marked compacted 
away. In the event
-          //they're contradicting in order to guarantee data consistency
-          //should we choose one and ignore the other?
-          if (storeClosing && r != null && !r.isCompactedAway()) {
-            String msg =
-                "Region closing but StoreFile is in compacted list but not 
compacted away: " +
-                    file.getPath();
-            throw new IllegalStateException(msg);
-          }
-
           if (r == null) {
             if (LOG.isDebugEnabled()) {
               LOG.debug("The file " + file + " was closed but still not 
archived.");
@@ -2783,13 +2786,7 @@ public class HStore implements Store {
             filesToRemove.add(file);
           }
 
-          //If store is closing we're ignoring any references to keep things 
consistent
-          //and remove compacted storefiles from the region directory
-          if (r != null && file.isCompactedAway() && (!r.isReferencedInReads() 
|| storeClosing)) {
-            if (storeClosing && r.isReferencedInReads()) {
-              LOG.warn("Region closing but StoreFile still has references: 
file=" +
-                  file.getPath() + ", refCount=" + r.getRefCount());
-            }
+          if (r != null && r.isCompactedAway() && !r.isReferencedInReads()) {
             // Even if deleting fails we need not bother as any new scanners 
won't be
             // able to use the compacted file as the status is already 
compactedAway
             if (LOG.isTraceEnabled()) {
@@ -2805,16 +2802,8 @@ public class HStore implements Store {
                 + ", refCount=" + r.getRefCount() + ", skipping for now.");
           }
         } catch (Exception e) {
-          String msg = "Exception while trying to close the compacted store 
file " +
-              file.getPath();
-          if (storeClosing) {
-            msg = "Store is closing. " + msg;
-          }
-          LOG.error(msg, e);
-          //if we get an exception let caller know so it can abort the server
-          if (storeClosing) {
-            throw new IOException(msg, e);
-          }
+          LOG.error(
+            "Exception while trying to close the compacted store file " + 
file.getPath().getName());
         }
       }
     }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 4c5cd7c..d9fd1d9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -26,7 +26,9 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -56,6 +58,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
 import org.apache.hadoop.hbase.util.BloomFilter;
 import org.apache.hadoop.hbase.util.BloomFilterFactory;
@@ -117,6 +120,11 @@ public class StoreFile {
   /** Key for timestamp of earliest-put in metadata*/
   public static final byte[] EARLIEST_PUT_TS = 
Bytes.toBytes("EARLIEST_PUT_TS");
 
+  /**
+   * Key for compaction event which contains the compacted storefiles in 
FileInfo
+   */
+  public static final byte[] COMPACTION_EVENT_KEY = 
Bytes.toBytes("COMPACTION_EVENT_KEY");
+
   private final StoreFileInfo fileInfo;
   private final FileSystem fs;
 
@@ -170,6 +178,9 @@ public class StoreFile {
   // It's set whenever you get a Reader.
   private boolean excludeFromMinorCompaction = false;
 
+  // This file was product of these compacted store files
+  private final Set<String> compactedStoreFiles = new HashSet<>();
+
   /** Meta key set when store file is a result of a bulk load */
   public static final byte[] BULKLOAD_TASK_KEY =
     Bytes.toBytes("BULKLOAD_SOURCE_TASK");
@@ -512,6 +523,14 @@ public class StoreFile {
           "proceeding without", e);
       this.reader.timeRange = null;
     }
+
+    try {
+      byte[] data = metadataMap.get(COMPACTION_EVENT_KEY);
+      
this.compactedStoreFiles.addAll(ProtobufUtil.toCompactedStoreFiles(data));
+    } catch (IOException e) {
+      LOG.error("Error reading compacted storefiles from meta data", e);
+    }
+
     // initialize so we can reuse them after reader closed.
     firstKey = reader.getFirstKey();
     lastKey = reader.getLastKey();
@@ -624,6 +643,7 @@ public class StoreFile {
     private HFileContext fileContext;
     private TimeRangeTracker trt;
     private boolean shouldDropCacheBehind;
+    private Collection<StoreFile> compactedFiles = Collections.emptySet();
 
     public WriterBuilder(Configuration conf, CacheConfig cacheConf,
         FileSystem fs) {
@@ -707,6 +727,11 @@ public class StoreFile {
       return this;
     }
 
+    public WriterBuilder withCompactedFiles(Collection<StoreFile> 
compactedFiles) {
+      this.compactedFiles = compactedFiles;
+      return this;
+    }
+
     /**
      * Create a store file writer. Client is responsible for closing file when
      * done. If metadata, add BEFORE closing using
@@ -748,7 +773,7 @@ public class StoreFile {
       }
       return new Writer(fs, filePath,
           conf, cacheConf, comparator, bloomType, maxKeyCount, favoredNodes, 
fileContext,
-          shouldDropCacheBehind, trt);
+          shouldDropCacheBehind, trt, compactedFiles);
     }
   }
 
@@ -808,6 +833,10 @@ public class StoreFile {
     return null;
   }
 
+  Set<String> getCompactedStoreFiles() {
+    return Collections.unmodifiableSet(this.compactedStoreFiles);
+  }
+
   /**
    * A StoreFile writer.  Use this to read/write HBase Store Files. It is 
package
    * local because it is an implementation detail of the HBase regionserver.
@@ -835,6 +864,7 @@ public class StoreFile {
     final TimeRangeTracker timeRangeTracker;
 
     protected HFile.Writer writer;
+    private final Collection<StoreFile> compactedFiles;
 
     /**
      * Creates an HFile.Writer that also write helpful meta data.
@@ -857,7 +887,7 @@ public class StoreFile {
         InetSocketAddress[] favoredNodes, HFileContext fileContext, boolean 
shouldDropCacheBehind)
             throws IOException {
       this(fs, path, conf, cacheConf, comparator, bloomType, maxKeys, 
favoredNodes, fileContext,
-          shouldDropCacheBehind, null);
+          shouldDropCacheBehind, null, Collections.<StoreFile>emptySet());
     }
 
     /**
@@ -873,6 +903,7 @@ public class StoreFile {
      * @param fileContext - The HFile context
      * @param shouldDropCacheBehind Drop pages written to page cache after 
writing the store file.
      * @param trt Ready-made timetracker to use.
+     * @param compactedFilesSupplier Compacted files which not archived
      * @throws IOException problem writing to FS
      */
     private Writer(FileSystem fs, Path path,
@@ -880,8 +911,11 @@ public class StoreFile {
         CacheConfig cacheConf,
         final KVComparator comparator, BloomType bloomType, long maxKeys,
         InetSocketAddress[] favoredNodes, HFileContext fileContext,
-        boolean shouldDropCacheBehind, final TimeRangeTracker trt)
-            throws IOException {
+        boolean shouldDropCacheBehind, final TimeRangeTracker trt,
+        Collection<StoreFile> compactedFiles)
+        throws IOException {
+      this.compactedFiles =
+          (compactedFiles == null ? Collections.<StoreFile> emptySet() : 
compactedFiles);
       // If passed a TimeRangeTracker, use it. Set timeRangeTrackerSet so we 
don't destroy it.
       // TODO: put the state of the TRT on the TRT; i.e. make a read-only 
version (TimeRange) when
       // it no longer writable.
@@ -926,21 +960,61 @@ public class StoreFile {
     }
 
     /**
-     * Writes meta data.
-     * Call before {@link #close()} since its written as meta data to this 
file.
+     * Writes meta data. Call before {@link #close()} since its written as 
meta data to this file.
      * @param maxSequenceId Maximum sequence id.
      * @param majorCompaction True if this file is product of a major 
compaction
      * @throws IOException problem writing to FS
      */
     public void appendMetadata(final long maxSequenceId, final boolean 
majorCompaction)
-    throws IOException {
+        throws IOException {
+      appendMetadata(maxSequenceId, majorCompaction, Collections.<StoreFile> 
emptySet());
+    }
+
+    /**
+     * Writes meta data. Call before {@link #close()} since its written as 
meta data to this file.
+     * @param maxSequenceId Maximum sequence id.
+     * @param majorCompaction True if this file is product of a major 
compaction
+     * @param storeFiles The compacted store files to generate this new file
+     * @throws IOException problem writing to FS
+     */
+    public void appendMetadata(final long maxSequenceId, final boolean 
majorCompaction,
+        final Collection<StoreFile> storeFiles) throws IOException {
       writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
-      writer.appendFileInfo(MAJOR_COMPACTION_KEY,
-          Bytes.toBytes(majorCompaction));
+      writer.appendFileInfo(MAJOR_COMPACTION_KEY, 
Bytes.toBytes(majorCompaction));
+      writer.appendFileInfo(COMPACTION_EVENT_KEY, 
toCompactionEventTrackerBytes(storeFiles));
       appendTrackedTimestampsToMetadata();
     }
 
     /**
+     * Used when write {@link HStoreFile#COMPACTION_EVENT_KEY} to new file's 
file info. The
+     * compacted store files's name is needed. But if the compacted store file 
is a result of
+     * compaction, it's compacted files which still not archived is needed, 
too. And don't need to
+     * add compacted files recursively. If file A, B, C compacted to new file 
D, and file D
+     * compacted to new file E, will write A, B, C, D to file E's compacted 
files. So if file E
+     * compacted to new file F, will add E to F's compacted files first, then 
add E's compacted
+     * files: A, B, C, D to it. And no need to add D's compacted file, as D's 
compacted files has
+     * been in E's compacted files, too. See HBASE-20724 for more details.
+     * @param storeFiles The compacted store files to generate this new file
+     * @return bytes of CompactionEventTracker
+     */
+    private byte[] toCompactionEventTrackerBytes(Collection<StoreFile> 
storeFiles) {
+      Set<String> notArchivedCompactedStoreFiles = new HashSet<>();
+      for (StoreFile sf : this.compactedFiles) {
+        notArchivedCompactedStoreFiles.add(sf.getPath().getName());
+      }
+      Set<String> compactedStoreFiles = new HashSet<>();
+      for (StoreFile storeFile : storeFiles) {
+        compactedStoreFiles.add(storeFile.getFileInfo().getPath().getName());
+        for (String csf : storeFile.getCompactedStoreFiles()) {
+          if (notArchivedCompactedStoreFiles.contains(csf)) {
+            compactedStoreFiles.add(csf);
+          }
+        }
+      }
+      return ProtobufUtil.toCompactionEventTrackerBytes(compactedStoreFiles);
+    }
+
+    /**
      * Add TimestampRange and earliest put timestamp to Metadata
      */
     public void appendTrackedTimestampsToMetadata() throws IOException {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
index b1203c5c..e5f5b81 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DateTieredCompactor.java
@@ -74,6 +74,6 @@ public class DateTieredCompactor extends 
AbstractMultiOutputCompactor<DateTiered
   @Override
   protected List<Path> commitWriter(DateTieredMultiFileWriter writer, 
FileDetails fd,
       CompactionRequest request) throws IOException {
-    return writer.commitWriters(fd.maxSeqId, request.isAllFiles());
+    return writer.commitWriters(fd.maxSeqId, request.isAllFiles(), 
request.getFiles());
   }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index 9759d2b..bea8b1e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -87,7 +87,7 @@ public class DefaultCompactor extends Compactor<Writer> {
   protected List<Path> commitWriter(Writer writer, FileDetails fd,
       CompactionRequest request) throws IOException {
     List<Path> newFiles = Lists.newArrayList(writer.getPath());
-    writer.appendMetadata(fd.maxSeqId, request.isAllFiles());
+    writer.appendMetadata(fd.maxSeqId, request.isAllFiles(), 
request.getFiles());
     writer.close();
     return newFiles;
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index 5e796ad..526b32a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -126,7 +126,7 @@ public class StripeCompactor extends 
AbstractMultiOutputCompactor<StripeMultiFil
   @Override
   protected List<Path> commitWriter(StripeMultiFileWriter writer, FileDetails 
fd,
       CompactionRequest request) throws IOException {
-    List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor());
+    List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor(), 
request.getFiles());
     assert !newFiles.isEmpty() : "Should have produced an empty file to 
preserve metadata.";
     return newFiles;
   }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileAfterFailover.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileAfterFailover.java
new file mode 100644
index 0000000..edd1f6f
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileAfterFailover.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
+import 
org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({LargeTests.class})
+public class TestCleanupCompactedFileAfterFailover {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestCleanupCompactedFileAfterFailover.class);
+
+  private static HBaseTestingUtility TEST_UTIL;
+  private static Admin admin;
+  private static Table table;
+
+  private static TableName TABLE_NAME = 
TableName.valueOf("TestCleanupCompactedFileAfterFailover");
+  private static byte[] ROW = Bytes.toBytes("row");
+  private static byte[] FAMILY = Bytes.toBytes("cf");
+  private static byte[] QUALIFIER = Bytes.toBytes("cq");
+  private static byte[] VALUE = Bytes.toBytes("value");
+  private static final int RS_NUMBER = 5;
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    TEST_UTIL = new HBaseTestingUtility();
+    // Set the scanner lease to 20min, so the scanner can't be closed by 
RegionServer
+    
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
 1200000);
+    TEST_UTIL.getConfiguration()
+        .setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 100);
+    TEST_UTIL.getConfiguration().set("dfs.blocksize", "64000");
+    TEST_UTIL.getConfiguration().set("dfs.namenode.fs-limits.min-block-size", 
"1024");
+    TEST_UTIL.getConfiguration().set(TimeToLiveHFileCleaner.TTL_CONF_KEY, "0");
+    TEST_UTIL.startMiniCluster(RS_NUMBER);
+    admin = TEST_UTIL.getHBaseAdmin();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void before() throws Exception {
+    HTableDescriptor htd = new HTableDescriptor(TABLE_NAME);
+    htd.addFamily(new HColumnDescriptor(FAMILY));
+    admin.createTable(htd);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+    table = TEST_UTIL.getConnection().getTable(TABLE_NAME);
+  }
+
+  @After
+  public void after() throws Exception {
+    admin.disableTable(TABLE_NAME);
+    admin.deleteTable(TABLE_NAME);
+  }
+
+  @Test
+  public void testCleanupAfterFailoverWithCompactOnce() throws Exception {
+    testCleanupAfterFailover(1);
+  }
+
+  @Test
+  public void testCleanupAfterFailoverWithCompactTwice() throws Exception {
+    testCleanupAfterFailover(2);
+  }
+
+  @Test
+  public void testCleanupAfterFailoverWithCompactThreeTimes() throws Exception 
{
+    testCleanupAfterFailover(3);
+  }
+
+  private void testCleanupAfterFailover(int compactNum) throws Exception {
+    HRegionServer rsServedTable = null;
+    List<Region> regions = new ArrayList<>();
+    for (JVMClusterUtil.RegionServerThread rsThread : 
TEST_UTIL.getHBaseCluster()
+        .getLiveRegionServerThreads()) {
+      HRegionServer rs = rsThread.getRegionServer();
+      if (rs.getOnlineTables().contains(TABLE_NAME)) {
+        regions.addAll(rs.getOnlineRegions(TABLE_NAME));
+        rsServedTable = rs;
+      }
+    }
+    assertNotNull(rsServedTable);
+    assertEquals("Table should only have one region", 1, regions.size());
+    HRegion region = (HRegion)regions.get(0);
+    HStore store = (HStore)region.getStore(FAMILY);
+
+    writeDataAndFlush(3, region);
+    assertEquals(3, store.getStorefilesCount());
+
+    // Open a scanner and not close, then the storefile will be referenced
+    store.getScanner(new Scan(), null, Long.MAX_VALUE);
+
+    region.compact(true);
+    assertEquals(1, store.getStorefilesCount());
+    // The compacted file should not be archived as there are references by 
user scanner
+    assertEquals(3, 
store.getStoreEngine().getStoreFileManager().getCompactedfiles().size());
+
+    for (int i = 1; i < compactNum; i++) {
+      // Compact again
+      region.compact(true);
+      assertEquals(1, store.getStorefilesCount());
+      store.closeAndArchiveCompactedFiles();
+      // Compacted storefiles still be 3 as the new compacted storefile was 
archived
+      assertEquals(3, 
store.getStoreEngine().getStoreFileManager().getCompactedfiles().size());
+    }
+
+    int walNum = rsServedTable.getWALs().size();
+    // Roll WAL
+    rsServedTable.walRoller.requestRollAll();
+    // Flush again
+    region.flush(true);
+    // The WAL which contains compaction event marker should be archived
+    assertEquals("The old WAL should be archived", walNum, 
rsServedTable.getWALs().size());
+
+    rsServedTable.kill();
+    // Sleep to wait failover
+    Thread.sleep(3000);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+
+    regions.clear();
+    for (JVMClusterUtil.RegionServerThread rsThread : 
TEST_UTIL.getHBaseCluster()
+        .getLiveRegionServerThreads()) {
+      HRegionServer rs = rsThread.getRegionServer();
+      if (rs != rsServedTable && rs.getOnlineTables().contains(TABLE_NAME)) {
+        regions.addAll(rs.getOnlineRegions(TABLE_NAME));
+      }
+    }
+    assertEquals("Table should only have one region", 1, regions.size());
+    region = (HRegion)regions.get(0);
+    store = (HStore)region.getStore(FAMILY);
+    // The compacted storefile should be cleaned and only have 1 storefile
+    assertEquals(1, store.getStorefilesCount());
+  }
+
+  private void writeDataAndFlush(int fileNum, HRegion region) throws Exception 
{
+    for (int i = 0; i < fileNum; i++) {
+      for (int j = 0; j < 100; j++) {
+        table.put(new Put(concat(ROW, j)).addColumn(FAMILY, QUALIFIER, 
concat(VALUE, j)));
+      }
+      region.flush(true);
+    }
+  }
+
+  private byte[] concat(byte[] base, int index) {
+    return Bytes.toBytes(Bytes.toString(base) + "-" + index);
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileOnRegionClose.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileOnRegionClose.java
index dfc9209..f54c636 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileOnRegionClose.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCleanupCompactedFileOnRegionClose.java
@@ -23,18 +23,13 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
 import java.util.Collection;
 
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.IsolationLevel;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -145,48 +140,4 @@ public class TestCleanupCompactedFileOnRegionClose {
     
assertNull(((HStore)region.getStore(familyNameBytes)).getStoreEngine().getStoreFileManager()
         .getCompactedfiles());
   }
-
-  @Test
-  public void testIOExceptionThrownOnClose() throws Exception {
-    byte[] filler = new byte[128000];
-    TableName tableName = TableName.valueOf("testIOExceptionThrownOnClose");
-    String familyName = "f";
-    byte[] familyNameBytes = Bytes.toBytes(familyName);
-    util.createTable(tableName, familyName);
-
-    Table table = util.getConnection().getTable(tableName);
-
-    HRegionServer rs = util.getRSForFirstRegionInTable(tableName);
-    Region region = rs.getOnlineRegions(tableName).get(0);
-
-    int refSFCount = 4;
-    for (int i = 0; i < refSFCount; i++) {
-      for (int j = 0; j < refSFCount; j++) {
-        Put put = new Put(Bytes.toBytes(j));
-        put.addColumn(familyNameBytes, Bytes.toBytes(i), filler);
-        table.put(put);
-      }
-      util.flush(tableName);
-    }
-    assertEquals(refSFCount, region.getStoreFileList(new 
byte[][]{familyNameBytes}).size());
-
-    HStore store = (HStore)region.getStore(familyNameBytes);
-    StoreFile hsf = 
region.getStore(familyNameBytes).getStorefiles().iterator().next();
-    long readPt = region.getReadpoint(IsolationLevel.READ_COMMITTED);
-    StoreFileScanner preadScanner = hsf.getReader().getStoreFileScanner(
-        false, true, false, readPt, 0, false);
-    preadScanner.seek(KeyValue.LOWESTKEY);
-
-    //Major compact to produce compacted storefiles that need to be cleaned up
-    util.compact(tableName, true);
-    assertNotNull(preadScanner.next());
-    store.closeAndArchiveCompactedFiles(true);
-
-    try {
-      assertNotNull(preadScanner.next());
-      fail("Expected IOException");
-    }catch (IOException ex) {
-      ex.printStackTrace();
-    }
-  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
index 0de3dbf..c8bf76a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCompactor.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyCollection;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
@@ -110,6 +111,13 @@ public class TestCompactor {
           return null;
         }
       }).when(writer).appendMetadata(any(long.class), any(boolean.class));
+      doAnswer(new Answer<Void>() {
+        @Override
+        public Void answer(InvocationOnMock invocation) throws Throwable {
+          realWriter.hasMetadata = true;
+          return null;
+        }
+      }).when(writer).appendMetadata(any(long.class), any(boolean.class), 
anyCollection());
       doAnswer(new Answer<Path>() {
         @Override
         public Path answer(InvocationOnMock invocation) throws Throwable {

Reply via email to