infraio commented on a change in pull request #2354:
URL: https://github.com/apache/hbase/pull/2354#discussion_r484257892



##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationBarrierFamilyFormat.java
##########
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.Cell.Type;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.ClientMetaTableAccessor;
+import org.apache.hadoop.hbase.ClientMetaTableAccessor.QueryType;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Helper class for storing replication barriers in family 'rep_barrier' of 
meta table.
+ * <p/>
+ * See SerialReplicationChecker on how to make use of the barriers.
+ */
+@InterfaceAudience.Private
+public final class ReplicationBarrierFamilyFormat {
+
+  @VisibleForTesting
+  public static final byte[] REPLICATION_PARENT_QUALIFIER = 
Bytes.toBytes("parent");
+
+  private static final byte ESCAPE_BYTE = (byte) 0xFF;
+
+  private static final byte SEPARATED_BYTE = 0x00;
+
+  private ReplicationBarrierFamilyFormat() {
+  }
+
+  public static void addReplicationBarrier(Put put, long openSeqNum) throws 
IOException {
+    
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
+      
.setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(HConstants.SEQNUM_QUALIFIER)
+      
.setTimestamp(put.getTimestamp()).setType(Type.Put).setValue(Bytes.toBytes(openSeqNum))
+      .build());
+  }
+
+  private static void writeRegionName(ByteArrayOutputStream out, byte[] 
regionName) {
+    for (byte b : regionName) {
+      if (b == ESCAPE_BYTE) {
+        out.write(ESCAPE_BYTE);
+      }
+      out.write(b);
+    }
+  }
+
+  @VisibleForTesting
+  public static byte[] getParentsBytes(List<RegionInfo> parents) {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    Iterator<RegionInfo> iter = parents.iterator();
+    writeRegionName(bos, iter.next().getRegionName());
+    while (iter.hasNext()) {
+      bos.write(ESCAPE_BYTE);
+      bos.write(SEPARATED_BYTE);
+      writeRegionName(bos, iter.next().getRegionName());
+    }
+    return bos.toByteArray();
+  }
+
+  private static List<byte[]> parseParentsBytes(byte[] bytes) {
+    List<byte[]> parents = new ArrayList<>();
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    for (int i = 0; i < bytes.length; i++) {
+      if (bytes[i] == ESCAPE_BYTE) {
+        i++;
+        if (bytes[i] == SEPARATED_BYTE) {
+          parents.add(bos.toByteArray());
+          bos.reset();
+          continue;
+        }
+        // fall through to append the byte
+      }
+      bos.write(bytes[i]);
+    }
+    if (bos.size() > 0) {
+      parents.add(bos.toByteArray());
+    }
+    return parents;
+  }
+
+  public static void addReplicationParent(Put put, List<RegionInfo> parents) 
throws IOException {
+    byte[] value = getParentsBytes(parents);
+    
put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(put.getRow())
+      
.setFamily(HConstants.REPLICATION_BARRIER_FAMILY).setQualifier(REPLICATION_PARENT_QUALIFIER)
+      
.setTimestamp(put.getTimestamp()).setType(Type.Put).setValue(value).build());
+  }
+
+  public static Put makePutForReplicationBarrier(RegionInfo regionInfo, long 
openSeqNum, long ts)
+    throws IOException {
+    Put put = new Put(regionInfo.getRegionName(), ts);
+    addReplicationBarrier(put, openSeqNum);
+    return put;
+  }
+
+  public static final class ReplicationBarrierResult {
+    private final long[] barriers;
+    private final RegionState.State state;
+    private final List<byte[]> parentRegionNames;
+
+    ReplicationBarrierResult(long[] barriers, State state, List<byte[]> 
parentRegionNames) {
+      this.barriers = barriers;
+      this.state = state;
+      this.parentRegionNames = parentRegionNames;
+    }
+
+    public long[] getBarriers() {
+      return barriers;
+    }
+
+    public RegionState.State getState() {
+      return state;
+    }
+
+    public List<byte[]> getParentRegionNames() {
+      return parentRegionNames;
+    }
+
+    @Override
+    public String toString() {
+      return "ReplicationBarrierResult [barriers=" + Arrays.toString(barriers) 
+ ", state=" +
+        state + ", parentRegionNames=" +
+        
parentRegionNames.stream().map(Bytes::toStringBinary).collect(Collectors.joining(",
 ")) +
+        "]";
+    }
+  }
+
+  private static long getReplicationBarrier(Cell c) {
+    return Bytes.toLong(c.getValueArray(), c.getValueOffset(), 
c.getValueLength());
+  }
+
+  public static long[] getReplicationBarriers(Result result) {
+    return result.getColumnCells(HConstants.REPLICATION_BARRIER_FAMILY, 
HConstants.SEQNUM_QUALIFIER)
+      
.stream().mapToLong(ReplicationBarrierFamilyFormat::getReplicationBarrier).sorted().distinct()
+      .toArray();
+  }
+
+  private static ReplicationBarrierResult getReplicationBarrierResult(Result 
result) {
+    long[] barriers = getReplicationBarriers(result);
+    byte[] stateBytes = result.getValue(HConstants.CATALOG_FAMILY, 
HConstants.STATE_QUALIFIER);
+    RegionState.State state =
+      stateBytes != null ? 
RegionState.State.valueOf(Bytes.toString(stateBytes)) : null;
+    byte[] parentRegionsBytes =
+      result.getValue(HConstants.REPLICATION_BARRIER_FAMILY, 
REPLICATION_PARENT_QUALIFIER);
+    List<byte[]> parentRegionNames =
+      parentRegionsBytes != null ? parseParentsBytes(parentRegionsBytes) : 
Collections.emptyList();
+    return new ReplicationBarrierResult(barriers, state, parentRegionNames);
+  }
+
+  public static ReplicationBarrierResult 
getReplicationBarrierResult(Connection conn,
+    TableName tableName, byte[] row, byte[] encodedRegionName) throws 
IOException {
+    byte[] metaStartKey = RegionInfo.createRegionName(tableName, row, 
HConstants.NINES, false);
+    byte[] metaStopKey =
+      RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", 
false);
+    Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey)
+      .addColumn(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER)
+      
.addFamily(HConstants.REPLICATION_BARRIER_FAMILY).readAllVersions().setReversed(true)
+      .setCaching(10);
+    try (Table table = conn.getTable(TableName.META_TABLE_NAME);
+      ResultScanner scanner = table.getScanner(scan)) {
+      for (Result result;;) {
+        result = scanner.next();
+        if (result == null) {
+          return new ReplicationBarrierResult(new long[0], null, 
Collections.emptyList());
+        }
+        byte[] regionName = result.getRow();
+        // TODO: we may look up a region which has already been split or 
merged so we need to check
+        // whether the encoded name matches. Need to find a way to quit 
earlier when there is no
+        // record for the given region, for now it will scan to the end of the 
table.
+        if (!Bytes.equals(encodedRegionName,
+          Bytes.toBytes(RegionInfo.encodeRegionName(regionName)))) {
+          continue;
+        }
+        return getReplicationBarrierResult(result);
+      }
+    }
+  }
+
+  public static long[] getReplicationBarrier(Connection conn, byte[] 
regionName)

Review comment:
       getReplicationBarrier => getReplicationBarriers? As this method return a 
arrary too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to