HBASE-19665 Add table based replication peers/queues storage back
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/31978c31 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/31978c31 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/31978c31 Branch: refs/heads/master Commit: 31978c31bbf363d98c50cc6b293105a085888471 Parents: 641c87d Author: huzheng <open...@gmail.com> Authored: Mon Mar 5 19:45:45 2018 +0800 Committer: huzheng <open...@gmail.com> Committed: Wed Mar 14 15:42:16 2018 +0800 ---------------------------------------------------------------------- .../replication/ReplicationPeerStorage.java | 3 +- .../replication/ReplicationStorageFactory.java | 20 +- .../hbase/replication/ReplicationUtils.java | 13 + .../TableReplicationPeerStorage.java | 171 ++++++ .../TableReplicationQueueStorage.java | 522 +++++++++++++++++++ .../TableReplicationStorageBase.java | 127 +++++ .../replication/ZKReplicationPeerStorage.java | 16 +- .../replication/ZKReplicationQueueStorage.java | 6 +- .../replication/TestReplicationStateBasic.java | 363 ------------- .../replication/TestReplicationStateZKImpl.java | 95 ---- .../TestZKReplicationPeerStorage.java | 178 ------- .../TestZKReplicationQueueStorage.java | 252 --------- .../TestReplicationSourceManager.java | 6 +- .../storage/TestReplicationStateBasic.java | 370 +++++++++++++ .../storage/TestReplicationStateTableImpl.java | 129 +++++ .../storage/TestReplicationStateZKImpl.java | 98 ++++ .../storage/TestZKReplicationPeerStorage.java | 182 +++++++ .../storage/TestZKReplicationQueueStorage.java | 255 +++++++++ 18 files changed, 1899 insertions(+), 907 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/31978c31/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java index 1adda02..4684f08 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java @@ -42,7 +42,8 @@ public interface ReplicationPeerStorage { /** * Set the state of peer, {@code true} to {@code ENABLED}, otherwise to {@code DISABLED}. - * @throws ReplicationException if there are errors accessing the storage service. + * @throws ReplicationException if there are errors accessing the storage service or peer does not + * exist. */ void setPeerState(String peerId, boolean enabled) throws ReplicationException; http://git-wip-us.apache.org/repos/asf/hbase/blob/31978c31/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java index 462cfed..cbfec3b 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.replication; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; @@ -29,6 +30,15 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public final class ReplicationStorageFactory { + public static final String REPLICATION_PEER_STORAGE_IMPL = "hbase.replication.peer.storage.impl"; + public static final String DEFAULT_REPLICATION_PEER_STORAGE_IMPL = + ZKReplicationPeerStorage.class.getName(); + + public static final String REPLICATION_QUEUE_STORAGE_IMPL = + "hbase.replication.queue.storage.impl"; + public static final String DEFAULT_REPLICATION_QUEUE_STORAGE_IMPL = + ZKReplicationQueueStorage.class.getName(); + private ReplicationStorageFactory() { } @@ -36,7 +46,10 @@ public final class ReplicationStorageFactory { * Create a new {@link ReplicationPeerStorage}. */ public static ReplicationPeerStorage getReplicationPeerStorage(ZKWatcher zk, Configuration conf) { - return new ZKReplicationPeerStorage(zk, conf); + String peerStorageClass = + conf.get(REPLICATION_PEER_STORAGE_IMPL, DEFAULT_REPLICATION_PEER_STORAGE_IMPL); + return ReflectionUtils.instantiateWithCustomCtor(peerStorageClass, + new Class[] { ZKWatcher.class, Configuration.class }, new Object[] { zk, conf }); } /** @@ -44,6 +57,9 @@ public final class ReplicationStorageFactory { */ public static ReplicationQueueStorage getReplicationQueueStorage(ZKWatcher zk, Configuration conf) { - return new ZKReplicationQueueStorage(zk, conf); + String queueStorageClass = + conf.get(REPLICATION_QUEUE_STORAGE_IMPL, DEFAULT_REPLICATION_QUEUE_STORAGE_IMPL); + return ReflectionUtils.instantiateWithCustomCtor(queueStorageClass, + new Class[] { ZKWatcher.class, Configuration.class }, new Object[] { zk, conf }); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/31978c31/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index e2479e0..2e86c17 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.replication; +import static org.apache.hadoop.hbase.replication.ZKReplicationStorageBase.toByteArray; + import java.io.IOException; import java.util.Collection; import java.util.List; @@ -30,12 +32,19 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; + /** * Helper class for replication. */ @InterfaceAudience.Private public final class ReplicationUtils { + public static final byte[] PEER_STATE_ENABLED_BYTES = + toByteArray(ReplicationProtos.ReplicationState.State.ENABLED); + public static final byte[] PEER_STATE_DISABLED_BYTES = + toByteArray(ReplicationProtos.ReplicationState.State.DISABLED); + private ReplicationUtils() { } @@ -173,4 +182,8 @@ public final class ReplicationUtils { return tableCFs != null && tableCFs.containsKey(tableName); } } + + public static String parsePeerIdFromQueueId(String queueId) { + return new ReplicationQueueInfo(queueId).getPeerId(); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/31978c31/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationPeerStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationPeerStorage.java new file mode 100644 index 0000000..ee7969b --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationPeerStorage.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.apache.hadoop.hbase.replication.ReplicationUtils.PEER_STATE_DISABLED_BYTES; +import static org.apache.hadoop.hbase.replication.ReplicationUtils.PEER_STATE_ENABLED_BYTES; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; +import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Table based replication peer storage. + */ +@InterfaceAudience.Private +public class TableReplicationPeerStorage extends TableReplicationStorageBase + implements ReplicationPeerStorage { + + public TableReplicationPeerStorage(ZKWatcher zookeeper, Configuration conf) throws IOException { + super(zookeeper, conf); + } + + @Override + public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) + throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Put put = new Put(Bytes.toBytes(peerId)); + put.addColumn(FAMILY_PEER, QUALIFIER_PEER_CONFIG, + ReplicationPeerConfigUtil.toByteArray(peerConfig)); + put.addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE, + enabled ? PEER_STATE_ENABLED_BYTES : PEER_STATE_DISABLED_BYTES); + table.put(put); + } catch (IOException e) { + throw new ReplicationException("Failed to add peer " + peerId, e); + } + } + + @Override + public void removePeer(String peerId) throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Delete delete = new Delete(Bytes.toBytes(peerId)); + table.delete(delete); + } catch (IOException e) { + throw new ReplicationException("Failed to remove peer " + peerId, e); + } + } + + // TODO make it to be a checkExistAndMutate operation. + private boolean peerExist(String peerId, Table table) throws IOException { + Get get = new Get(Bytes.toBytes(peerId)); + get.addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE); + return table.exists(get); + } + + @Override + public void setPeerState(String peerId, boolean enabled) throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + if (!peerExist(peerId, table)) { + throw new ReplicationException("Peer " + peerId + " does not exist."); + } + Put put = new Put(Bytes.toBytes(peerId)); + put.addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE, + enabled ? PEER_STATE_ENABLED_BYTES : PEER_STATE_DISABLED_BYTES); + table.put(put); + } catch (IOException e) { + throw new ReplicationException( + "Failed to set peer state, peerId=" + peerId + ", state=" + enabled, e); + } + } + + @Override + public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) + throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + if (!peerExist(peerId, table)) { + throw new ReplicationException("Peer " + peerId + " does not exist."); + } + Put put = new Put(Bytes.toBytes(peerId)); + put.addColumn(FAMILY_PEER, QUALIFIER_PEER_CONFIG, + ReplicationPeerConfigUtil.toByteArray(peerConfig)); + table.put(put); + } catch (IOException e) { + throw new ReplicationException("Failed to update peer configuration, peerId=" + peerId, e); + } + } + + @Override + public List<String> listPeerIds() throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Scan scan = new Scan().addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE); + try (ResultScanner scanner = table.getScanner(scan)) { + List<String> peerIds = new ArrayList<>(); + for (Result r : scanner) { + peerIds.add(Bytes.toString(r.getRow())); + } + return peerIds; + } + } catch (IOException e) { + throw new ReplicationException("Failed to list peers", e); + } + } + + @Override + public boolean isPeerEnabled(String peerId) throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Get get = new Get(Bytes.toBytes(peerId)).addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE); + Result r = table.get(get); + if (r == null) { + throw new ReplicationException("Peer " + peerId + " does not found"); + } + return Arrays.equals(PEER_STATE_ENABLED_BYTES, r.getValue(FAMILY_PEER, QUALIFIER_PEER_STATE)); + } catch (IOException e) { + throw new ReplicationException("Failed to read the peer state, peerId=" + peerId, e); + } + } + + @Override + public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Get get = new Get(Bytes.toBytes(peerId)).addColumn(FAMILY_PEER, QUALIFIER_PEER_CONFIG); + Result r = table.get(get); + if (r == null) { + throw new ReplicationException("Peer " + peerId + " does not found"); + } + byte[] data = r.getValue(FAMILY_PEER, QUALIFIER_PEER_CONFIG); + if (data == null || data.length == 0) { + throw new ReplicationException( + "Replication peer config data shouldn't be empty, peerId=" + peerId); + } + try { + return ReplicationPeerConfigUtil.parsePeerFrom(data); + } catch (DeserializationException e) { + throw new ReplicationException( + "Failed to parse replication peer config for peer with id=" + peerId, e); + } + } catch (IOException e) { + throw new ReplicationException( + "Failed to read the peer configuration in hbase:replication, peerId=" + peerId, e); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/31978c31/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java new file mode 100644 index 0000000..abb279d --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java @@ -0,0 +1,522 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CollectionUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Table based replication queue storage. + */ +@InterfaceAudience.Private +public class TableReplicationQueueStorage extends TableReplicationStorageBase + implements ReplicationQueueStorage { + + private static final Logger LOG = LoggerFactory.getLogger(TableReplicationQueueStorage.class); + + public TableReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) throws IOException { + super(zookeeper, conf); + } + + /** + * Serialize the {fileName, position} pair into a byte array. + */ + private static byte[] makeByteArray(String fileName, long position) { + byte[] data = new byte[Bytes.SIZEOF_INT + fileName.length() + Bytes.SIZEOF_LONG]; + int pos = 0; + pos = Bytes.putInt(data, pos, fileName.length()); + pos = Bytes.putBytes(data, pos, Bytes.toBytes(fileName), 0, fileName.length()); + pos = Bytes.putLong(data, pos, position); + assert pos == data.length; + return data; + } + + /** + * Deserialize the byte array into a {filename, position} pair. + */ + private static Pair<String, Long> parseFileNameAndPosition(byte[] data, int offset) + throws IOException { + if (data == null) { + throw new IOException("The byte array shouldn't be null"); + } + int pos = offset; + int len = Bytes.toInt(data, pos, Bytes.SIZEOF_INT); + pos += Bytes.SIZEOF_INT; + if (pos + len > data.length) { + throw new IllegalArgumentException("offset (" + pos + ") + length (" + len + ") exceed the" + + " capacity of the array: " + data.length); + } + String fileName = Bytes.toString(Bytes.copy(data, pos, len)); + pos += len; + long position = Bytes.toLong(data, pos, Bytes.SIZEOF_LONG); + return new Pair<>(fileName, position); + } + + @Override + public void removeQueue(ServerName serverName, String queueId) throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Delete delete = new Delete(getServerNameRowKey(serverName)); + delete.addColumn(FAMILY_QUEUE, Bytes.toBytes(queueId)); + // Delete all <fileName, position> pairs. + delete.addColumns(FAMILY_WAL, Bytes.toBytes(queueId), HConstants.LATEST_TIMESTAMP); + table.delete(delete); + } catch (IOException e) { + throw new ReplicationException( + "Failed to remove wal from queue, serverName=" + serverName + ", queueId=" + queueId, e); + } + } + + @Override + public void addWAL(ServerName serverName, String queueId, String fileName) + throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Put put = new Put(getServerNameRowKey(serverName)); + put.addColumn(FAMILY_RS_STATE, QUALIFIER_STATE_ENABLED, HConstants.EMPTY_BYTE_ARRAY); + put.addColumn(FAMILY_QUEUE, Bytes.toBytes(queueId), HConstants.EMPTY_BYTE_ARRAY); + put.addColumn(FAMILY_WAL, Bytes.toBytes(queueId), makeByteArray(fileName, 0L)); + table.put(put); + } catch (IOException e) { + throw new ReplicationException("Failed to add wal to queue, serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName, e); + } + } + + @Override + public void removeWAL(ServerName serverName, String queueId, String fileName) + throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Optional<WALCell> walCell = getWALsInQueue0(table, serverName, queueId).stream() + .filter(w -> w.fileNameMatch(fileName)).findFirst(); + if (walCell.isPresent()) { + Delete delete = new Delete(getServerNameRowKey(walCell.get().serverName)) + .addColumn(FAMILY_WAL, Bytes.toBytes(queueId), walCell.get().cellTimestamp); + table.delete(delete); + } else { + LOG.warn(fileName + " has already been deleted when removing log"); + } + } catch (IOException e) { + throw new ReplicationException("Failed to remove wal from queue, serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName, e); + } + } + + @Override + public void setWALPosition(ServerName serverName, String queueId, String fileName, long position, + Map<String, Long> lastSeqIds) throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Optional<WALCell> walCell = getWALsInQueue0(table, serverName, queueId).stream() + .filter(w -> w.fileNameMatch(fileName)).findFirst(); + if (walCell.isPresent()) { + List<Put> puts = new ArrayList<>(); + Put put = new Put(getServerNameRowKey(serverName)).addColumn(FAMILY_WAL, + Bytes.toBytes(walCell.get().queueId), walCell.get().cellTimestamp, + makeByteArray(fileName, position)); + puts.add(put); + // Update the last pushed sequence id for each region in a batch. + String peerId = ReplicationUtils.parsePeerIdFromQueueId(queueId); + if (lastSeqIds != null && lastSeqIds.size() > 0) { + for (Map.Entry<String, Long> e : lastSeqIds.entrySet()) { + Put regionPut = new Put(Bytes.toBytes(peerId)).addColumn(FAMILY_REGIONS, + getRegionQualifier(e.getKey()), Bytes.toBytes(e.getValue())); + puts.add(regionPut); + } + } + table.put(puts); + } else { + throw new ReplicationException("WAL file " + fileName + " does not found under queue " + + queueId + " for server " + serverName); + } + } catch (IOException e) { + throw new ReplicationException( + "Failed to set wal position and last sequence ids, serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position, + e); + } + } + + @Override + public long getLastSequenceId(String encodedRegionName, String peerId) + throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Get get = new Get(Bytes.toBytes(peerId)); + get.addColumn(FAMILY_REGIONS, getRegionQualifier(encodedRegionName)); + Result r = table.get(get); + if (r == null || r.listCells() == null) { + return HConstants.NO_SEQNUM; + } + return Bytes.toLong(r.getValue(FAMILY_REGIONS, getRegionQualifier(encodedRegionName))); + } catch (IOException e) { + throw new ReplicationException( + "Failed to get last sequence id, region=" + encodedRegionName + ", peerId=" + peerId, e); + } + } + + @Override + public long getWALPosition(ServerName serverName, String queueId, String fileName) + throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Optional<WALCell> walCell = getWALsInQueue0(table, serverName, queueId).stream() + .filter(w -> w.fileNameMatch(fileName)).findFirst(); + if (walCell.isPresent()) { + return walCell.get().position; + } else { + LOG.warn("WAL " + fileName + " does not found under queue " + queueId + " for server " + + serverName); + return 0; + } + } catch (IOException e) { + throw new ReplicationException("Failed to get wal position. serverName=" + serverName + + ", queueId=" + queueId + ", fileName=" + fileName, e); + } + } + + /** + * Each cell in column wal:{queueId} will be parsed to a WALCell. The WALCell will be more + * friendly to upper layer. + */ + private static final class WALCell { + ServerName serverName; + String queueId; + String wal; + long position; + long cellTimestamp;// Timestamp of the cell + + private WALCell(ServerName serverName, String queueId, String wal, long position, + long cellTimestamp) { + this.serverName = serverName; + this.queueId = queueId; + this.wal = wal; + this.position = position; + this.cellTimestamp = cellTimestamp; + } + + public static WALCell create(Cell cell) throws IOException { + ServerName serverName = ServerName.parseServerName( + Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); + String queueId = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength()); + Pair<String, Long> fileAndPos = + parseFileNameAndPosition(cell.getValueArray(), cell.getValueOffset()); + return new WALCell(serverName, queueId, fileAndPos.getFirst(), fileAndPos.getSecond(), + cell.getTimestamp()); + } + + public boolean fileNameMatch(String fileName) { + return StringUtils.equals(wal, fileName); + } + } + + /** + * Parse the WALCell list from a HBase result. + */ + private List<WALCell> result2WALCells(Result r) throws IOException { + List<WALCell> wals = new ArrayList<>(); + if (r != null && r.listCells() != null && r.listCells().size() > 0) { + for (Cell cell : r.listCells()) { + wals.add(WALCell.create(cell)); + } + } + return wals; + } + + /** + * List all WALs for the specific region server and queueId. + */ + private List<WALCell> getWALsInQueue0(Table table, ServerName serverName, String queueId) + throws IOException { + Get get = new Get(getServerNameRowKey(serverName)).addColumn(FAMILY_WAL, Bytes.toBytes(queueId)) + .readAllVersions(); + return result2WALCells(table.get(get)); + } + + @Override + public List<String> getWALsInQueue(ServerName serverName, String queueId) + throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + return getWALsInQueue0(table, serverName, queueId).stream().map(p -> p.wal) + .collect(Collectors.toList()); + } catch (IOException e) { + throw new ReplicationException( + "Failed to get wals in queue. serverName=" + serverName + ", queueId=" + queueId, e); + } + } + + @Override + public List<String> getAllQueues(ServerName serverName) throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + List<String> queues = new ArrayList<>(); + Get get = new Get(getServerNameRowKey(serverName)).addFamily(FAMILY_QUEUE); + Result r = table.get(get); + if (r != null && r.listCells() != null && r.listCells().size() > 0) { + for (Cell c : r.listCells()) { + String queue = + Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()); + queues.add(queue); + } + } + return queues; + } catch (IOException e) { + throw new ReplicationException("Failed to get all queues. serverName=" + serverName, e); + } + } + + @Override + public Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String queueId, + ServerName destServerName) throws ReplicationException { + LOG.info( + "Atomically moving " + sourceServerName + "/" + queueId + "'s WALs to " + destServerName); + try (Table table = getReplicationMetaTable()) { + // Create an enabled region server for destination. + byte[] destServerNameRowKey = getServerNameRowKey(destServerName); + byte[] srcServerNameRowKey = getServerNameRowKey(sourceServerName); + Put put = new Put(destServerNameRowKey).addColumn(FAMILY_RS_STATE, QUALIFIER_STATE_ENABLED, + HConstants.EMPTY_BYTE_ARRAY); + table.put(put); + List<WALCell> wals = getWALsInQueue0(table, sourceServerName, queueId); + String newQueueId = queueId + "-" + sourceServerName; + // Remove the queue in source region server if wal set of the queue is empty. + if (CollectionUtils.isEmpty(wals)) { + Delete delete = + new Delete(srcServerNameRowKey).addColumn(FAMILY_QUEUE, Bytes.toBytes(queueId)) + .addColumns(FAMILY_WAL, Bytes.toBytes(queueId), HConstants.LATEST_TIMESTAMP); + table.delete(delete); + LOG.info("Removed " + sourceServerName + "/" + queueId + " since it's empty"); + return new Pair<>(newQueueId, Collections.emptySortedSet()); + } + // Transfer all wals from source region server to destination region server in a batch. + List<Mutation> mutations = new ArrayList<>(); + // a. Create queue for destination server. + mutations.add(new Put(destServerNameRowKey).addColumn(FAMILY_QUEUE, Bytes.toBytes(newQueueId), + HConstants.EMPTY_BYTE_ARRAY)); + SortedSet<String> logQueue = new TreeSet<>(); + for (WALCell wal : wals) { + byte[] data = makeByteArray(wal.wal, wal.cellTimestamp); + // b. Add wal to destination server. + mutations.add( + new Put(destServerNameRowKey).addColumn(FAMILY_WAL, Bytes.toBytes(newQueueId), data)); + // c. Remove wal from source server. + mutations.add(new Delete(srcServerNameRowKey).addColumn(FAMILY_WAL, Bytes.toBytes(queueId), + wal.cellTimestamp)); + logQueue.add(wal.wal); + } + // d. Remove the queue of source server. + mutations + .add(new Delete(srcServerNameRowKey).addColumn(FAMILY_QUEUE, Bytes.toBytes(queueId))); + Object[] results = new Object[mutations.size()]; + table.batch(mutations, results); + boolean allSuccess = Stream.of(results).allMatch(r -> r != null); + if (!allSuccess) { + throw new ReplicationException("Claim queue queueId=" + queueId + " from " + + sourceServerName + " to " + destServerName + " failed, not all mutations success."); + } + LOG.info( + "Atomically moved " + sourceServerName + "/" + queueId + "'s WALs to " + destServerName); + return new Pair<>(newQueueId, logQueue); + } catch (IOException | InterruptedException e) { + throw new ReplicationException("Claim queue queueId=" + queueId + " from " + sourceServerName + + " to " + destServerName + " failed", e); + } + } + + @Override + public void removeReplicatorIfQueueIsEmpty(ServerName serverName) throws ReplicationException { + // TODO Make this to be a checkAndDelete, and provide a UT for it. + try (Table table = getReplicationMetaTable()) { + Get get = new Get(getServerNameRowKey(serverName)).addFamily(FAMILY_WAL).readAllVersions(); + Result r = table.get(get); + if (r == null || r.listCells() == null || r.listCells().size() == 0) { + Delete delete = new Delete(getServerNameRowKey(serverName)); + table.delete(delete); + } + } catch (IOException e) { + throw new ReplicationException( + "Failed to remove replicator when queue is empty, serverName=" + serverName, e); + } + } + + @Override + public List<ServerName> getListOfReplicators() throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Scan scan = new Scan().addColumn(FAMILY_RS_STATE, QUALIFIER_STATE_ENABLED).readVersions(1); + Set<ServerName> serverNames = new HashSet<>(); + try (ResultScanner scanner = table.getScanner(scan)) { + for (Result r : scanner) { + if (r.listCells().size() > 0) { + Cell firstCell = r.listCells().get(0); + String serverName = Bytes.toString(firstCell.getRowArray(), firstCell.getRowOffset(), + firstCell.getRowLength()); + serverNames.add(ServerName.parseServerName(serverName)); + } + } + } + return new ArrayList<>(serverNames); + } catch (IOException e) { + throw new ReplicationException("Failed to get list of replicators", e); + } + } + + @Override + public Set<String> getAllWALs() throws ReplicationException { + Set<String> walSet = new HashSet<>(); + try (Table table = getReplicationMetaTable()) { + Scan scan = new Scan().addFamily(FAMILY_WAL).readAllVersions(); + try (ResultScanner scanner = table.getScanner(scan)) { + for (Result r : scanner) { + result2WALCells(r).forEach(w -> walSet.add(w.wal)); + } + } + return walSet; + } catch (IOException e) { + throw new ReplicationException("Failed to get all wals", e); + } + } + + @Override + public void addPeerToHFileRefs(String peerId) throws ReplicationException { + // Need to do nothing. + } + + @Override + public void removePeerFromHFileRefs(String peerId) throws ReplicationException { + // Need to do nothing. + } + + @Override + public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) + throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + List<Put> puts = new ArrayList<>(); + for (Pair<Path, Path> p : pairs) { + Put put = new Put(Bytes.toBytes(peerId)); + put.addColumn(FAMILY_HFILE_REFS, Bytes.toBytes(p.getSecond().getName()), + HConstants.EMPTY_BYTE_ARRAY); + puts.add(put); + } + table.put(puts); + } catch (IOException e) { + throw new ReplicationException("Failed to add hfile refs, peerId=" + peerId, e); + } + } + + @Override + public void removeHFileRefs(String peerId, List<String> files) throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + List<Delete> deletes = new ArrayList<>(); + for (String file : files) { + Delete delete = new Delete(Bytes.toBytes(peerId)); + delete.addColumns(FAMILY_HFILE_REFS, Bytes.toBytes(file)); + deletes.add(delete); + } + table.delete(deletes); + } catch (IOException e) { + throw new ReplicationException("Failed to remove hfile refs, peerId=" + peerId, e); + } + } + + @Override + public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Set<String> peers = new HashSet<>(); + Scan scan = new Scan().addFamily(FAMILY_HFILE_REFS); + try (ResultScanner scanner = table.getScanner(scan)) { + for (Result r : scanner) { + if (r.listCells().size() > 0) { + Cell firstCell = r.listCells().get(0); + String peerId = Bytes.toString(firstCell.getRowArray(), firstCell.getRowOffset(), + firstCell.getRowLength()); + peers.add(peerId); + } + } + } + return new ArrayList<>(peers); + } catch (IOException e) { + throw new ReplicationException("Faield to get all peers by reading hbase:replication meta", + e); + } + } + + @Override + public List<String> getReplicableHFiles(String peerId) throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Get get = new Get(Bytes.toBytes(peerId)).addFamily(FAMILY_HFILE_REFS); + Result r = table.get(get); + List<String> hfiles = new ArrayList<>(); + if (r != null && r.listCells() != null) { + for (Cell c : r.listCells()) { + hfiles.add( + Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength())); + } + } + return hfiles; + } catch (IOException e) { + throw new ReplicationException("Failed to get replicable hfiles, peerId=" + peerId, e); + } + } + + @Override + public Set<String> getAllHFileRefs() throws ReplicationException { + try (Table table = getReplicationMetaTable()) { + Scan scan = new Scan().addFamily(FAMILY_HFILE_REFS); + try (ResultScanner scanner = table.getScanner(scan)) { + Set<String> hfileSet = new HashSet<>(); + for (Result r : scanner) { + for (Cell c : r.listCells()) { + String hfile = Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), + c.getQualifierLength()); + hfileSet.add(hfile); + } + } + return hfileSet; + } + } catch (IOException e) { + throw new ReplicationException("Failed to get all hfile refs", e); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/31978c31/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationStorageBase.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationStorageBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationStorageBase.java new file mode 100644 index 0000000..fd2b574 --- /dev/null +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationStorageBase.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + +@InterfaceAudience.Private +public class TableReplicationStorageBase { + protected final ZKWatcher zookeeper; + protected final Configuration conf; + + public static final TableName REPLICATION_TABLE = + TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication"); + + // Peer family, the row key would be peer id. + public static final byte[] FAMILY_PEER = Bytes.toBytes("peer"); + public static final byte[] QUALIFIER_PEER_CONFIG = Bytes.toBytes("config"); + public static final byte[] QUALIFIER_PEER_STATE = Bytes.toBytes("state"); + + // Region server state family, the row key would be name of region server. + public static final byte[] FAMILY_RS_STATE = Bytes.toBytes("rs_state"); + public static final byte[] QUALIFIER_STATE_ENABLED = Bytes.toBytes("enabled"); + + // Queue and wal family, the row key would be name of region server. + public static final byte[] FAMILY_QUEUE = Bytes.toBytes("queue"); + public static final byte[] FAMILY_WAL = Bytes.toBytes("wal"); + + // HFile-Refs family, the row key would be peer id. + public static final byte[] FAMILY_HFILE_REFS = Bytes.toBytes("hfile-refs"); + + // Region family, the row key would be peer id. + public static final byte[] FAMILY_REGIONS = Bytes.toBytes("regions"); + + private Connection connection; + + protected static byte[] getServerNameRowKey(ServerName serverName) { + return Bytes.toBytes(serverName.toString()); + } + + protected static byte[] getRegionQualifier(String encodedRegionName) { + return Bytes.toBytes(encodedRegionName); + } + + @VisibleForTesting + public static TableDescriptorBuilder createReplicationTableDescBuilder(final Configuration conf) + throws IOException { + int metaMaxVersion = + conf.getInt(HConstants.HBASE_META_VERSIONS, HConstants.DEFAULT_HBASE_META_VERSIONS); + int metaBlockSize = + conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, HConstants.DEFAULT_HBASE_META_BLOCK_SIZE); + return TableDescriptorBuilder + .newBuilder(REPLICATION_TABLE) + .addColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_PEER).setMaxVersions(metaMaxVersion) + .setInMemory(true).setBlocksize(metaBlockSize) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE) + .build()) + .addColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_RS_STATE).setMaxVersions(metaMaxVersion) + .setInMemory(true).setBlocksize(metaBlockSize) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE) + .build()) + .addColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_QUEUE).setMaxVersions(metaMaxVersion) + .setInMemory(true).setBlocksize(metaBlockSize) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE) + .build()) + .addColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_WAL) + .setMaxVersions(HConstants.ALL_VERSIONS).setInMemory(true) + .setBlocksize(metaBlockSize).setScope(HConstants.REPLICATION_SCOPE_LOCAL) + .setBloomFilterType(BloomType.NONE).build()) + .addColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_REGIONS).setMaxVersions(metaMaxVersion) + .setInMemory(true).setBlocksize(metaBlockSize) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE) + .build()) + .addColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_HFILE_REFS) + .setMaxVersions(metaMaxVersion).setInMemory(true).setBlocksize(metaBlockSize) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE) + .build()); + } + + protected TableReplicationStorageBase(ZKWatcher zookeeper, Configuration conf) + throws IOException { + this.zookeeper = zookeeper; + this.conf = conf; + this.connection = ConnectionFactory.createConnection(conf); + } + + protected Table getReplicationMetaTable() throws IOException { + return this.connection.getTable(REPLICATION_TABLE); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/31978c31/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java index a53500a..138f14a 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.replication; +import static org.apache.hadoop.hbase.replication.ReplicationUtils.PEER_STATE_DISABLED_BYTES; +import static org.apache.hadoop.hbase.replication.ReplicationUtils.PEER_STATE_ENABLED_BYTES; + import java.util.Arrays; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -31,8 +34,6 @@ import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; - /** * ZK based replication peer storage. */ @@ -46,11 +47,6 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase public static final String PEERS_STATE_ZNODE = "zookeeper.znode.replication.peers.state"; public static final String PEERS_STATE_ZNODE_DEFAULT = "peer-state"; - public static final byte[] ENABLED_ZNODE_BYTES = - toByteArray(ReplicationProtos.ReplicationState.State.ENABLED); - public static final byte[] DISABLED_ZNODE_BYTES = - toByteArray(ReplicationProtos.ReplicationState.State.DISABLED); - /** * The name of the znode that contains the replication status of a remote slave (i.e. peer) * cluster. @@ -89,7 +85,7 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase ZKUtilOp.createAndFailSilent(getPeerNode(peerId), ReplicationPeerConfigUtil.toByteArray(peerConfig)), ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId), - enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES)), + enabled ? PEER_STATE_ENABLED_BYTES : PEER_STATE_DISABLED_BYTES)), false); } catch (KeeperException e) { throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfif=>" @@ -108,7 +104,7 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase @Override public void setPeerState(String peerId, boolean enabled) throws ReplicationException { - byte[] stateBytes = enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES; + byte[] stateBytes = enabled ? PEER_STATE_ENABLED_BYTES : PEER_STATE_DISABLED_BYTES; try { ZKUtil.setData(zookeeper, getPeerStateNode(peerId), stateBytes); } catch (KeeperException e) { @@ -140,7 +136,7 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase @Override public boolean isPeerEnabled(String peerId) throws ReplicationException { try { - return Arrays.equals(ENABLED_ZNODE_BYTES, + return Arrays.equals(PEER_STATE_ENABLED_BYTES, ZKUtil.getData(zookeeper, getPeerStateNode(peerId))); } catch (KeeperException | InterruptedException e) { throw new ReplicationException("Unable to get status of the peer with id=" + peerId, e); http://git-wip-us.apache.org/repos/asf/hbase/blob/31978c31/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java index 9a281b1..fa0ff0e 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -79,7 +79,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti * </pre> */ @InterfaceAudience.Private -class ZKReplicationQueueStorage extends ZKReplicationStorageBase +public class ZKReplicationQueueStorage extends ZKReplicationStorageBase implements ReplicationQueueStorage { private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class); @@ -199,7 +199,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase // Persist the max sequence id(s) of regions for serial replication atomically. if (lastSeqIds != null && lastSeqIds.size() > 0) { for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) { - String peerId = new ReplicationQueueInfo(queueId).getPeerId(); + String peerId = ReplicationUtils.parsePeerIdFromQueueId(queueId); String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId); /* * Make sure the existence of path @@ -382,7 +382,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase // will be overridden in UTs @VisibleForTesting - protected int getQueuesZNodeCversion() throws KeeperException { + public int getQueuesZNodeCversion() throws KeeperException { Stat stat = new Stat(); ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat); return stat.getCversion(); http://git-wip-us.apache.org/repos/asf/hbase/blob/31978c31/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java deleted file mode 100644 index 5999c1f..0000000 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ /dev/null @@ -1,363 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import static org.hamcrest.CoreMatchers.hasItems; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.zookeeper.ZKConfig; -import org.apache.zookeeper.KeeperException; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; - -/** - * White box testing for replication state interfaces. Implementations should extend this class, and - * initialize the interfaces properly. - */ -public abstract class TestReplicationStateBasic { - - private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class); - - protected ReplicationQueueStorage rqs; - protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 1234, 12345); - protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 1234, 12345); - protected ServerName server3 = ServerName.valueOf("hostname3.example.org", 1234, 12345); - protected ReplicationPeers rp; - protected static final String ID_ONE = "1"; - protected static final String ID_TWO = "2"; - protected static String KEY_ONE; - protected static String KEY_TWO; - - // For testing when we try to replicate to ourself - protected String OUR_KEY; - - protected static int zkTimeoutCount; - protected static final int ZK_MAX_COUNT = 300; - protected static final int ZK_SLEEP_INTERVAL = 100; // millis - - @Test - public void testReplicationQueueStorage() throws ReplicationException { - // Test methods with empty state - assertEquals(0, rqs.getListOfReplicators().size()); - assertTrue(rqs.getWALsInQueue(server1, "qId1").isEmpty()); - assertTrue(rqs.getAllQueues(server1).isEmpty()); - - /* - * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each -- - * server2: zero queues - */ - rqs.addWAL(server1, "qId1", "trash"); - rqs.removeWAL(server1, "qId1", "trash"); - rqs.addWAL(server1,"qId2", "filename1"); - rqs.addWAL(server1,"qId3", "filename2"); - rqs.addWAL(server1,"qId3", "filename3"); - rqs.addWAL(server2,"trash", "trash"); - rqs.removeQueue(server2,"trash"); - - List<ServerName> reps = rqs.getListOfReplicators(); - assertEquals(2, reps.size()); - assertTrue(server1.getServerName(), reps.contains(server1)); - assertTrue(server2.getServerName(), reps.contains(server2)); - - assertTrue(rqs.getWALsInQueue(ServerName.valueOf("bogus", 12345, 12345), "bogus").isEmpty()); - assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty()); - assertEquals(0, rqs.getWALsInQueue(server1, "qId1").size()); - assertEquals(1, rqs.getWALsInQueue(server1, "qId2").size()); - assertEquals("filename1", rqs.getWALsInQueue(server1, "qId2").get(0)); - - assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 12345, -1L)).isEmpty()); - assertEquals(0, rqs.getAllQueues(server2).size()); - List<String> list = rqs.getAllQueues(server1); - assertEquals(3, list.size()); - assertTrue(list.contains("qId2")); - assertTrue(list.contains("qId3")); - } - - private void removeAllQueues(ServerName serverName) throws ReplicationException { - for (String queue: rqs.getAllQueues(serverName)) { - rqs.removeQueue(serverName, queue); - } - } - @Test - public void testReplicationQueues() throws ReplicationException { - // Initialize ReplicationPeer so we can add peers (we don't transfer lone queues) - rp.init(); - - rqs.removeQueue(server1, "bogus"); - rqs.removeWAL(server1, "bogus", "bogus"); - removeAllQueues(server1); - assertEquals(0, rqs.getAllQueues(server1).size()); - assertEquals(0, rqs.getWALPosition(server1, "bogus", "bogus")); - assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty()); - assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 1234, 12345)).isEmpty()); - - populateQueues(); - - assertEquals(3, rqs.getListOfReplicators().size()); - assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size()); - assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size()); - assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0")); - rqs.setWALPosition(server3, "qId5", "filename4", 354L, null); - assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4")); - - assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size()); - assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size()); - assertEquals(0, rqs.getAllQueues(server1).size()); - assertEquals(1, rqs.getAllQueues(server2).size()); - assertEquals(5, rqs.getAllQueues(server3).size()); - - assertEquals(0, rqs.getAllQueues(server1).size()); - rqs.removeReplicatorIfQueueIsEmpty(server1); - assertEquals(2, rqs.getListOfReplicators().size()); - - List<String> queues = rqs.getAllQueues(server3); - assertEquals(5, queues.size()); - for (String queue : queues) { - rqs.claimQueue(server3, queue, server2); - } - rqs.removeReplicatorIfQueueIsEmpty(server3); - assertEquals(1, rqs.getListOfReplicators().size()); - - assertEquals(6, rqs.getAllQueues(server2).size()); - removeAllQueues(server2); - rqs.removeReplicatorIfQueueIsEmpty(server2); - assertEquals(0, rqs.getListOfReplicators().size()); - } - - @Test - public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException { - rp.init(); - - List<Pair<Path, Path>> files1 = new ArrayList<>(3); - files1.add(new Pair<>(null, new Path("file_1"))); - files1.add(new Pair<>(null, new Path("file_2"))); - files1.add(new Pair<>(null, new Path("file_3"))); - assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); - assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); - rp.getPeerStorage().addPeer(ID_ONE, - ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true); - rqs.addPeerToHFileRefs(ID_ONE); - rqs.addHFileRefs(ID_ONE, files1); - assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); - assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size()); - List<String> hfiles2 = new ArrayList<>(files1.size()); - for (Pair<Path, Path> p : files1) { - hfiles2.add(p.getSecond().getName()); - } - String removedString = hfiles2.remove(0); - rqs.removeHFileRefs(ID_ONE, hfiles2); - assertEquals(1, rqs.getReplicableHFiles(ID_ONE).size()); - hfiles2 = new ArrayList<>(1); - hfiles2.add(removedString); - rqs.removeHFileRefs(ID_ONE, hfiles2); - assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size()); - rp.getPeerStorage().removePeer(ID_ONE); - } - - @Test - public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException { - rp.init(); - rp.getPeerStorage().addPeer(ID_ONE, - ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true); - rqs.addPeerToHFileRefs(ID_ONE); - rp.getPeerStorage().addPeer(ID_TWO, - ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true); - rqs.addPeerToHFileRefs(ID_TWO); - - List<Pair<Path, Path>> files1 = new ArrayList<>(3); - files1.add(new Pair<>(null, new Path("file_1"))); - files1.add(new Pair<>(null, new Path("file_2"))); - files1.add(new Pair<>(null, new Path("file_3"))); - rqs.addHFileRefs(ID_ONE, files1); - rqs.addHFileRefs(ID_TWO, files1); - assertEquals(2, rqs.getAllPeersFromHFileRefsQueue().size()); - assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size()); - assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size()); - - rp.getPeerStorage().removePeer(ID_ONE); - rqs.removePeerFromHFileRefs(ID_ONE); - assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size()); - assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty()); - assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size()); - - rp.getPeerStorage().removePeer(ID_TWO); - rqs.removePeerFromHFileRefs(ID_TWO); - assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size()); - assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty()); - } - - @Test - public void testReplicationPeers() throws Exception { - rp.init(); - - try { - rp.getPeerStorage().setPeerState("bogus", true); - fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); - } catch (ReplicationException e) { - } - try { - rp.getPeerStorage().setPeerState("bogus", false); - fail("Should have thrown an IllegalArgumentException when passed a bogus peerId"); - } catch (ReplicationException e) { - } - - try { - assertFalse(rp.addPeer("bogus")); - fail("Should have thrown an ReplicationException when passed a bogus peerId"); - } catch (ReplicationException e) { - } - - assertNumberOfPeers(0); - - // Add some peers - rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true); - assertNumberOfPeers(1); - rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true); - assertNumberOfPeers(2); - - assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils - .getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf()))); - rp.getPeerStorage().removePeer(ID_ONE); - rp.removePeer(ID_ONE); - assertNumberOfPeers(1); - - // Add one peer - rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true); - rp.addPeer(ID_ONE); - assertNumberOfPeers(2); - assertTrue(rp.getPeer(ID_ONE).isPeerEnabled()); - rp.getPeerStorage().setPeerState(ID_ONE, false); - // now we do not rely on zk watcher to trigger the state change so we need to trigger it - // manually... - ReplicationPeerImpl peer = rp.getPeer(ID_ONE); - rp.refreshPeerState(peer.getId()); - assertEquals(PeerState.DISABLED, peer.getPeerState()); - assertConnectedPeerStatus(false, ID_ONE); - rp.getPeerStorage().setPeerState(ID_ONE, true); - // now we do not rely on zk watcher to trigger the state change so we need to trigger it - // manually... - rp.refreshPeerState(peer.getId()); - assertEquals(PeerState.ENABLED, peer.getPeerState()); - assertConnectedPeerStatus(true, ID_ONE); - - // Disconnect peer - rp.removePeer(ID_ONE); - assertNumberOfPeers(2); - } - - private String getFileName(String base, int i) { - return String.format(base + "-%04d", i); - } - - @Test - public void testPersistLogPositionAndSeqIdAtomically() throws Exception { - ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000); - assertTrue(rqs.getAllQueues(serverName1).isEmpty()); - String queue1 = "1"; - String region0 = "region0", region1 = "region1"; - for (int i = 0; i < 10; i++) { - rqs.addWAL(serverName1, queue1, getFileName("file1", i)); - } - List<String> queueIds = rqs.getAllQueues(serverName1); - assertEquals(1, queueIds.size()); - assertThat(queueIds, hasItems("1")); - - List<String> wals1 = rqs.getWALsInQueue(serverName1, queue1); - assertEquals(10, wals1.size()); - for (int i = 0; i < 10; i++) { - assertThat(wals1, hasItems(getFileName("file1", i))); - } - - for (int i = 0; i < 10; i++) { - assertEquals(0, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i))); - } - assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region0, queue1)); - assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region1, queue1)); - - for (int i = 0; i < 10; i++) { - rqs.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100, - ImmutableMap.of(region0, i * 100L, region1, (i + 1) * 100L)); - } - - for (int i = 0; i < 10; i++) { - assertEquals((i + 1) * 100, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i))); - } - assertEquals(900L, rqs.getLastSequenceId(region0, queue1)); - assertEquals(1000L, rqs.getLastSequenceId(region1, queue1)); - } - - protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception { - // we can first check if the value was changed in the store, if it wasn't then fail right away - if (status != rp.getPeerStorage().isPeerEnabled(peerId)) { - fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK"); - } - while (true) { - if (status == rp.getPeer(peerId).isPeerEnabled()) { - return; - } - if (zkTimeoutCount < ZK_MAX_COUNT) { - LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status - + ", sleeping and trying again."); - Thread.sleep(ZK_SLEEP_INTERVAL); - } else { - fail("Timed out waiting for ConnectedPeerStatus to be " + status); - } - } - } - - protected void assertNumberOfPeers(int total) throws ReplicationException { - assertEquals(total, rp.getPeerStorage().listPeerIds().size()); - } - - /* - * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2, - * 3, 4, 5 log files respectively - */ - protected void populateQueues() throws ReplicationException { - rqs.addWAL(server1, "trash", "trash"); - rqs.removeQueue(server1, "trash"); - - rqs.addWAL(server2, "qId1", "trash"); - rqs.removeWAL(server2, "qId1", "trash"); - - for (int i = 1; i < 6; i++) { - for (int j = 0; j < i; j++) { - rqs.addWAL(server3, "qId" + i, "filename" + j); - } - // Add peers for the corresponding queues so they are not orphans - rp.getPeerStorage().addPeer("qId" + i, - ReplicationPeerConfig.newBuilder().setClusterKey("localhost:2818:/bogus" + i).build(), - true); - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/31978c31/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java deleted file mode 100644 index 08178f4..0000000 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ClusterId; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseZKTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.hadoop.hbase.zookeeper.ZKClusterId; -import org.apache.hadoop.hbase.zookeeper.ZKConfig; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.hadoop.hbase.zookeeper.ZNodePaths; -import org.apache.zookeeper.KeeperException; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.experimental.categories.Category; - -@Category({ ReplicationTests.class, MediumTests.class }) -public class TestReplicationStateZKImpl extends TestReplicationStateBasic { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestReplicationStateZKImpl.class); - - private static Configuration conf; - private static HBaseZKTestingUtility utility; - private static ZKWatcher zkw; - private static String replicationZNode; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - utility = new HBaseZKTestingUtility(); - utility.startMiniZKCluster(); - conf = utility.getConfiguration(); - conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); - zkw = utility.getZooKeeperWatcher(); - String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); - replicationZNode = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, replicationZNodeName); - KEY_ONE = initPeerClusterState("/hbase1"); - KEY_TWO = initPeerClusterState("/hbase2"); - } - - private static String initPeerClusterState(String baseZKNode) - throws IOException, KeeperException { - // Add a dummy region server and set up the cluster id - Configuration testConf = new Configuration(conf); - testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode); - ZKWatcher zkw1 = new ZKWatcher(testConf, "test1", null); - String fakeRs = ZNodePaths.joinZNode(zkw1.znodePaths.rsZNode, "hostname1.example.org:1234"); - ZKUtil.createWithParents(zkw1, fakeRs); - ZKClusterId.setClusterId(zkw1, new ClusterId()); - return ZKConfig.getZooKeeperClusterKey(testConf); - } - - @Before - public void setUp() { - zkTimeoutCount = 0; - rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); - rp = ReplicationFactory.getReplicationPeers(zkw, conf); - OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf); - } - - @After - public void tearDown() throws KeeperException, IOException { - ZKUtil.deleteNodeRecursively(zkw, replicationZNode); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - utility.shutdownMiniZKCluster(); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/31978c31/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java deleted file mode 100644 index 3290fb0..0000000 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java +++ /dev/null @@ -1,178 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import static java.util.stream.Collectors.toList; -import static java.util.stream.Collectors.toSet; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.stream.Stream; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseZKTestingUtility; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ ReplicationTests.class, MediumTests.class }) -public class TestZKReplicationPeerStorage { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestZKReplicationPeerStorage.class); - - private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility(); - - private static ZKReplicationPeerStorage STORAGE; - - @BeforeClass - public static void setUp() throws Exception { - UTIL.startMiniZKCluster(); - STORAGE = new ZKReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); - } - - @AfterClass - public static void tearDown() throws IOException { - UTIL.shutdownMiniZKCluster(); - } - - private Set<String> randNamespaces(Random rand) { - return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5)) - .collect(toSet()); - } - - private Map<TableName, List<String>> randTableCFs(Random rand) { - int size = rand.nextInt(5); - Map<TableName, List<String>> map = new HashMap<>(); - for (int i = 0; i < size; i++) { - TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong())); - List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong())) - .limit(rand.nextInt(5)).collect(toList()); - map.put(tn, cfs); - } - return map; - } - - private ReplicationPeerConfig getConfig(int seed) { - Random rand = new Random(seed); - return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(rand.nextLong())) - .setReplicationEndpointImpl(Long.toHexString(rand.nextLong())) - .setNamespaces(randNamespaces(rand)).setExcludeNamespaces(randNamespaces(rand)) - .setTableCFsMap(randTableCFs(rand)).setReplicateAllUserTables(rand.nextBoolean()) - .setBandwidth(rand.nextInt(1000)).build(); - } - - private void assertSetEquals(Set<String> expected, Set<String> actual) { - if (expected == null || expected.size() == 0) { - assertTrue(actual == null || actual.size() == 0); - return; - } - assertEquals(expected.size(), actual.size()); - expected.forEach(s -> assertTrue(actual.contains(s))); - } - - private void assertMapEquals(Map<TableName, List<String>> expected, - Map<TableName, List<String>> actual) { - if (expected == null || expected.size() == 0) { - assertTrue(actual == null || actual.size() == 0); - return; - } - assertEquals(expected.size(), actual.size()); - expected.forEach((expectedTn, expectedCFs) -> { - List<String> actualCFs = actual.get(expectedTn); - if (expectedCFs == null || expectedCFs.size() == 0) { - assertTrue(actual.containsKey(expectedTn)); - assertTrue(actualCFs == null || actualCFs.size() == 0); - } else { - assertNotNull(actualCFs); - assertEquals(expectedCFs.size(), actualCFs.size()); - for (Iterator<String> expectedIt = expectedCFs.iterator(), actualIt = actualCFs.iterator(); - expectedIt.hasNext();) { - assertEquals(expectedIt.next(), actualIt.next()); - } - } - }); - } - - private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) { - assertEquals(expected.getClusterKey(), actual.getClusterKey()); - assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl()); - assertSetEquals(expected.getNamespaces(), actual.getNamespaces()); - assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces()); - assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap()); - assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap()); - assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables()); - assertEquals(expected.getBandwidth(), actual.getBandwidth()); - } - - @Test - public void test() throws ReplicationException { - int peerCount = 10; - for (int i = 0; i < peerCount; i++) { - STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0); - } - List<String> peerIds = STORAGE.listPeerIds(); - assertEquals(peerCount, peerIds.size()); - for (String peerId : peerIds) { - int seed = Integer.parseInt(peerId); - assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId)); - } - for (int i = 0; i < peerCount; i++) { - STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1)); - } - for (String peerId : peerIds) { - int seed = Integer.parseInt(peerId); - assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId)); - } - for (int i = 0; i < peerCount; i++) { - assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i))); - } - for (int i = 0; i < peerCount; i++) { - STORAGE.setPeerState(Integer.toString(i), i % 2 != 0); - } - for (int i = 0; i < peerCount; i++) { - assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i))); - } - String toRemove = Integer.toString(peerCount / 2); - STORAGE.removePeer(toRemove); - peerIds = STORAGE.listPeerIds(); - assertEquals(peerCount - 1, peerIds.size()); - assertFalse(peerIds.contains(toRemove)); - - try { - STORAGE.getPeerConfig(toRemove); - fail("Should throw a ReplicationException when get peer config of a peerId"); - } catch (ReplicationException e) { - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/31978c31/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java ---------------------------------------------------------------------- diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java deleted file mode 100644 index 8ff52f3..0000000 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java +++ /dev/null @@ -1,252 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.replication; - -import static org.hamcrest.CoreMatchers.hasItems; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Set; -import java.util.SortedSet; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseZKTestingUtility; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.zookeeper.KeeperException; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ ReplicationTests.class, MediumTests.class }) -public class TestZKReplicationQueueStorage { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestZKReplicationQueueStorage.class); - - private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility(); - - private static ZKReplicationQueueStorage STORAGE; - - @BeforeClass - public static void setUp() throws Exception { - UTIL.startMiniZKCluster(); - STORAGE = new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); - } - - @AfterClass - public static void tearDown() throws IOException { - UTIL.shutdownMiniZKCluster(); - } - - @After - public void tearDownAfterTest() throws ReplicationException { - for (ServerName serverName : STORAGE.getListOfReplicators()) { - for (String queue : STORAGE.getAllQueues(serverName)) { - STORAGE.removeQueue(serverName, queue); - } - STORAGE.removeReplicatorIfQueueIsEmpty(serverName); - } - for (String peerId : STORAGE.getAllPeersFromHFileRefsQueue()) { - STORAGE.removePeerFromHFileRefs(peerId); - } - } - - private ServerName getServerName(int i) { - return ServerName.valueOf("127.0.0.1", 8000 + i, 10000 + i); - } - - @Test - public void testReplicator() throws ReplicationException { - assertTrue(STORAGE.getListOfReplicators().isEmpty()); - String queueId = "1"; - for (int i = 0; i < 10; i++) { - STORAGE.addWAL(getServerName(i), queueId, "file" + i); - } - List<ServerName> replicators = STORAGE.getListOfReplicators(); - assertEquals(10, replicators.size()); - for (int i = 0; i < 10; i++) { - assertThat(replicators, hasItems(getServerName(i))); - } - for (int i = 0; i < 5; i++) { - STORAGE.removeQueue(getServerName(i), queueId); - } - for (int i = 0; i < 10; i++) { - STORAGE.removeReplicatorIfQueueIsEmpty(getServerName(i)); - } - replicators = STORAGE.getListOfReplicators(); - assertEquals(5, replicators.size()); - for (int i = 5; i < 10; i++) { - assertThat(replicators, hasItems(getServerName(i))); - } - } - - private String getFileName(String base, int i) { - return String.format(base + "-%04d", i); - } - - @Test - public void testAddRemoveLog() throws ReplicationException { - ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000); - assertTrue(STORAGE.getAllQueues(serverName1).isEmpty()); - String queue1 = "1"; - String queue2 = "2"; - for (int i = 0; i < 10; i++) { - STORAGE.addWAL(serverName1, queue1, getFileName("file1", i)); - STORAGE.addWAL(serverName1, queue2, getFileName("file2", i)); - } - List<String> queueIds = STORAGE.getAllQueues(serverName1); - assertEquals(2, queueIds.size()); - assertThat(queueIds, hasItems("1", "2")); - - List<String> wals1 = STORAGE.getWALsInQueue(serverName1, queue1); - List<String> wals2 = STORAGE.getWALsInQueue(serverName1, queue2); - assertEquals(10, wals1.size()); - assertEquals(10, wals2.size()); - for (int i = 0; i < 10; i++) { - assertThat(wals1, hasItems(getFileName("file1", i))); - assertThat(wals2, hasItems(getFileName("file2", i))); - } - - for (int i = 0; i < 10; i++) { - assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i))); - assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i))); - STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100, null); - STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10, - null); - } - - for (int i = 0; i < 10; i++) { - assertEquals((i + 1) * 100, - STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i))); - assertEquals((i + 1) * 100 + 10, - STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i))); - } - - for (int i = 0; i < 10; i++) { - if (i % 2 == 0) { - STORAGE.removeWAL(serverName1, queue1, getFileName("file1", i)); - } else { - STORAGE.removeWAL(serverName1, queue2, getFileName("file2", i)); - } - } - - queueIds = STORAGE.getAllQueues(serverName1); - assertEquals(2, queueIds.size()); - assertThat(queueIds, hasItems("1", "2")); - - ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001); - Pair<String, SortedSet<String>> peer1 = STORAGE.claimQueue(serverName1, "1", serverName2); - - assertEquals("1-" + serverName1.getServerName(), peer1.getFirst()); - assertEquals(5, peer1.getSecond().size()); - int i = 1; - for (String wal : peer1.getSecond()) { - assertEquals(getFileName("file1", i), wal); - assertEquals((i + 1) * 100, - STORAGE.getWALPosition(serverName2, peer1.getFirst(), getFileName("file1", i))); - i += 2; - } - - queueIds = STORAGE.getAllQueues(serverName1); - assertEquals(1, queueIds.size()); - assertThat(queueIds, hasItems("2")); - wals2 = STORAGE.getWALsInQueue(serverName1, queue2); - assertEquals(5, wals2.size()); - for (i = 0; i < 10; i += 2) { - assertThat(wals2, hasItems(getFileName("file2", i))); - } - - queueIds = STORAGE.getAllQueues(serverName2); - assertEquals(1, queueIds.size()); - assertThat(queueIds, hasItems(peer1.getFirst())); - wals1 = STORAGE.getWALsInQueue(serverName2, peer1.getFirst()); - assertEquals(5, wals1.size()); - for (i = 1; i < 10; i += 2) { - assertThat(wals1, hasItems(getFileName("file1", i))); - } - - Set<String> allWals = STORAGE.getAllWALs(); - assertEquals(10, allWals.size()); - for (i = 0; i < 10; i++) { - assertThat(allWals, hasItems(i % 2 == 0 ? getFileName("file2", i) : getFileName("file1", i))); - } - } - - // For HBASE-12865 - @Test - public void testClaimQueueChangeCversion() throws ReplicationException, KeeperException { - ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000); - STORAGE.addWAL(serverName1, "1", "file"); - - int v0 = STORAGE.getQueuesZNodeCversion(); - ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001); - STORAGE.claimQueue(serverName1, "1", serverName2); - int v1 = STORAGE.getQueuesZNodeCversion(); - // cversion should increase by 1 since a child node is deleted - assertEquals(1, v1 - v0); - } - - private ZKReplicationQueueStorage createWithUnstableCversion() throws IOException { - return new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()) { - - private int called = 0; - - @Override - protected int getQueuesZNodeCversion() throws KeeperException { - if (called < 4) { - called++; - } - return called; - } - }; - } - - @Test - public void testGetAllWALsCversionChange() throws IOException, ReplicationException { - ZKReplicationQueueStorage storage = createWithUnstableCversion(); - storage.addWAL(getServerName(0), "1", "file"); - // This should return eventually when cversion stabilizes - Set<String> allWals = storage.getAllWALs(); - assertEquals(1, allWals.size()); - assertThat(allWals, hasItems("file")); - } - - // For HBASE-14621 - @Test - public void testGetAllHFileRefsCversionChange() throws IOException, ReplicationException { - ZKReplicationQueueStorage storage = createWithUnstableCversion(); - storage.addPeerToHFileRefs("1"); - Path p = new Path("/test"); - storage.addHFileRefs("1", Arrays.asList(Pair.newPair(p, p))); - // This should return eventually when cversion stabilizes - Set<String> allHFileRefs = storage.getAllHFileRefs(); - assertEquals(1, allHFileRefs.size()); - assertThat(allHFileRefs, hasItems("test")); - } -}