Apache9 commented on code in PR #4810: URL: https://github.com/apache/hbase/pull/4810#discussion_r1014650889
########## hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java: ########## @@ -294,80 +346,117 @@ public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exce return sb.toString(); } - public String dumpQueues(ZKWatcher zkw, Set<String> peerIds, boolean hdfs) throws Exception { - ReplicationQueueStorage queueStorage; + public String dumpQueues(Connection connection, Set<String> peerIds, boolean hdfs, + Configuration conf) throws Exception { StringBuilder sb = new StringBuilder(); + ReplicationQueueStorage queueStorage = + ReplicationStorageFactory.getReplicationQueueStorage(connection, conf); + + Set<ServerName> liveRegionServers = + connection.getAdmin().getClusterMetrics().getLiveServerMetrics().keySet(); - // queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); - // Set<ServerName> liveRegionServers = ZKUtil.listChildrenNoWatch(zkw, - // zkw.getZNodePaths().rsZNode) - // .stream().map(ServerName::parseServerName).collect(Collectors.toSet()); - // - // Loops each peer on each RS and dumps the queues - // List<ServerName> regionservers = queueStorage.getListOfReplicators(); - // if (regionservers == null || regionservers.isEmpty()) { - // return sb.toString(); - // } - // for (ServerName regionserver : regionservers) { - // List<String> queueIds = queueStorage.getAllQueues(regionserver); - // if (!liveRegionServers.contains(regionserver)) { - // deadRegionServers.add(regionserver.getServerName()); - // } - // for (String queueId : queueIds) { - // ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); - // List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId); - // Collections.sort(wals); - // if (!peerIds.contains(queueInfo.getPeerId())) { - // deletedQueues.add(regionserver + "/" + queueId); - // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs)); - // } else { - // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs)); - // } - // } - // } + List<ServerName> regionServers = queueStorage.listAllReplicators(); + if (regionServers == null || regionServers.isEmpty()) { + return sb.toString(); + } + for (ServerName regionServer : regionServers) { + List<ReplicationQueueId> queueIds = queueStorage.listAllQueueIds(regionServer); + + if (!liveRegionServers.contains(regionServer)) { + deadRegionServers.add(regionServer.getServerName()); + } + for (ReplicationQueueId queueId : queueIds) { + // wals + List<String> tmpWals = AbstractFSWALProvider + .getWALFiles(connection.getConfiguration(), + URLEncoder.encode(queueId.getServerWALsBelongTo().toString(), + StandardCharsets.UTF_8.name())) + .stream().map(Path::toString).collect(Collectors.toList()); + + // old wals + tmpWals.addAll(AbstractFSWALProvider + .getArchivedWALFiles(connection.getConfiguration(), queueId.getServerWALsBelongTo(), + URLEncoder.encode(queueId.getServerWALsBelongTo().toString(), + StandardCharsets.UTF_8.name())) + .stream().map(Path::toString).collect(Collectors.toList())); + + Map<String, ReplicationGroupOffset> offsets = queueStorage.getOffsets(queueId); + // filter out the wal files that should replicate + List<String> wals = new ArrayList<>(); + for (Map.Entry<String, ReplicationGroupOffset> entry : offsets.entrySet()) { + ReplicationGroupOffset offset = entry.getValue(); + for (String wal : tmpWals) { + if (ReplicationOffsetUtil.shouldReplicate(offset, wal)) { + wals.add(wal); + } + } + // only for test + if (tmpWals.isEmpty() && !wals.contains(offset.getWal())) { Review Comment: What does this mean? ########## hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java: ########## @@ -294,80 +346,117 @@ public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exce return sb.toString(); } - public String dumpQueues(ZKWatcher zkw, Set<String> peerIds, boolean hdfs) throws Exception { - ReplicationQueueStorage queueStorage; + public String dumpQueues(Connection connection, Set<String> peerIds, boolean hdfs, + Configuration conf) throws Exception { StringBuilder sb = new StringBuilder(); + ReplicationQueueStorage queueStorage = + ReplicationStorageFactory.getReplicationQueueStorage(connection, conf); + + Set<ServerName> liveRegionServers = + connection.getAdmin().getClusterMetrics().getLiveServerMetrics().keySet(); - // queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); - // Set<ServerName> liveRegionServers = ZKUtil.listChildrenNoWatch(zkw, - // zkw.getZNodePaths().rsZNode) - // .stream().map(ServerName::parseServerName).collect(Collectors.toSet()); - // - // Loops each peer on each RS and dumps the queues - // List<ServerName> regionservers = queueStorage.getListOfReplicators(); - // if (regionservers == null || regionservers.isEmpty()) { - // return sb.toString(); - // } - // for (ServerName regionserver : regionservers) { - // List<String> queueIds = queueStorage.getAllQueues(regionserver); - // if (!liveRegionServers.contains(regionserver)) { - // deadRegionServers.add(regionserver.getServerName()); - // } - // for (String queueId : queueIds) { - // ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); - // List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId); - // Collections.sort(wals); - // if (!peerIds.contains(queueInfo.getPeerId())) { - // deletedQueues.add(regionserver + "/" + queueId); - // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs)); - // } else { - // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs)); - // } - // } - // } + List<ServerName> regionServers = queueStorage.listAllReplicators(); + if (regionServers == null || regionServers.isEmpty()) { + return sb.toString(); + } + for (ServerName regionServer : regionServers) { + List<ReplicationQueueId> queueIds = queueStorage.listAllQueueIds(regionServer); + + if (!liveRegionServers.contains(regionServer)) { + deadRegionServers.add(regionServer.getServerName()); + } + for (ReplicationQueueId queueId : queueIds) { + // wals + List<String> tmpWals = AbstractFSWALProvider + .getWALFiles(connection.getConfiguration(), + URLEncoder.encode(queueId.getServerWALsBelongTo().toString(), + StandardCharsets.UTF_8.name())) + .stream().map(Path::toString).collect(Collectors.toList()); + + // old wals + tmpWals.addAll(AbstractFSWALProvider + .getArchivedWALFiles(connection.getConfiguration(), queueId.getServerWALsBelongTo(), + URLEncoder.encode(queueId.getServerWALsBelongTo().toString(), + StandardCharsets.UTF_8.name())) + .stream().map(Path::toString).collect(Collectors.toList())); + + Map<String, ReplicationGroupOffset> offsets = queueStorage.getOffsets(queueId); + // filter out the wal files that should replicate + List<String> wals = new ArrayList<>(); + for (Map.Entry<String, ReplicationGroupOffset> entry : offsets.entrySet()) { + ReplicationGroupOffset offset = entry.getValue(); + for (String wal : tmpWals) { + if (ReplicationOffsetUtil.shouldReplicate(offset, wal)) { + wals.add(wal); + } + } + // only for test + if (tmpWals.isEmpty() && !wals.contains(offset.getWal())) { + wals.add(offset.getWal()); + } + } + + Collections.sort(wals, Comparator.comparingLong(AbstractFSWALProvider::getTimestamp)); + if (!peerIds.contains(queueId.getPeerId())) { + deletedQueues.add(regionServer + "/" + queueId); + sb.append(formatQueue(regionServer, offsets, wals, queueId, true, hdfs)); + } else { + sb.append(formatQueue(regionServer, offsets, wals, queueId, false, hdfs)); + } + } + } return sb.toString(); } - private String formatQueue(ServerName regionserver, ReplicationQueueStorage queueStorage, - ReplicationQueueInfo queueInfo, String queueId, List<String> wals, boolean isDeleted, - boolean hdfs) throws Exception { + private String formatQueue(ServerName regionServer, Map<String, ReplicationGroupOffset> offsets, + List<String> wals, ReplicationQueueId queueId, boolean isDeleted, boolean hdfs) + throws Exception { StringBuilder sb = new StringBuilder(); - List<ServerName> deadServers; - - sb.append("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n"); - sb.append(" Queue znode: " + queueId + "\n"); - sb.append(" PeerID: " + queueInfo.getPeerId() + "\n"); - sb.append(" Recovered: " + queueInfo.isQueueRecovered() + "\n"); - deadServers = queueInfo.getDeadRegionServers(); - if (deadServers.isEmpty()) { - sb.append(" No dead RegionServers found in this queue." + "\n"); + sb.append("Dumping replication queue info for RegionServer: [" + regionServer + "]" + "\n"); + sb.append(" Queue id: " + queueId + "\n"); + sb.append(" PeerID: " + queueId.getPeerId() + "\n"); + sb.append(" Recovered: " + queueId.isRecovered() + "\n"); + // In new version, we only record the first dead RegionServer in queueId. + if (queueId.getSourceServerName().isPresent()) { + sb.append(" Dead RegionServer: " + queueId.getSourceServerName().get() + "\n"); } else { - sb.append(" Dead RegionServers: " + deadServers + "\n"); + sb.append(" No dead RegionServer found in this queue." + "\n"); } sb.append(" Was deleted: " + isDeleted + "\n"); sb.append(" Number of WALs in replication queue: " + wals.size() + "\n"); - peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size()); - - for (String wal : wals) { - // long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal); - // sb.append(" Replication position for " + wal + ": " - // + (position > 0 ? position : "0" + " (not started or nothing to replicate)") + "\n"); + peersQueueSize.addAndGet(queueId.getPeerId(), wals.size()); + + for (Map.Entry<String, ReplicationGroupOffset> entry : offsets.entrySet()) { + String walGroup = entry.getKey(); + ReplicationGroupOffset offset = entry.getValue(); + for (String wal : wals) { + if (offset.getWal().equals(wal)) { Review Comment: Why only log when equal? This means we will only log one file per group? ########## hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java: ########## @@ -294,80 +346,117 @@ public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exce return sb.toString(); } - public String dumpQueues(ZKWatcher zkw, Set<String> peerIds, boolean hdfs) throws Exception { - ReplicationQueueStorage queueStorage; + public String dumpQueues(Connection connection, Set<String> peerIds, boolean hdfs, + Configuration conf) throws Exception { StringBuilder sb = new StringBuilder(); + ReplicationQueueStorage queueStorage = + ReplicationStorageFactory.getReplicationQueueStorage(connection, conf); + + Set<ServerName> liveRegionServers = + connection.getAdmin().getClusterMetrics().getLiveServerMetrics().keySet(); - // queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); - // Set<ServerName> liveRegionServers = ZKUtil.listChildrenNoWatch(zkw, - // zkw.getZNodePaths().rsZNode) - // .stream().map(ServerName::parseServerName).collect(Collectors.toSet()); - // - // Loops each peer on each RS and dumps the queues - // List<ServerName> regionservers = queueStorage.getListOfReplicators(); - // if (regionservers == null || regionservers.isEmpty()) { - // return sb.toString(); - // } - // for (ServerName regionserver : regionservers) { - // List<String> queueIds = queueStorage.getAllQueues(regionserver); - // if (!liveRegionServers.contains(regionserver)) { - // deadRegionServers.add(regionserver.getServerName()); - // } - // for (String queueId : queueIds) { - // ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); - // List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId); - // Collections.sort(wals); - // if (!peerIds.contains(queueInfo.getPeerId())) { - // deletedQueues.add(regionserver + "/" + queueId); - // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs)); - // } else { - // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs)); - // } - // } - // } + List<ServerName> regionServers = queueStorage.listAllReplicators(); + if (regionServers == null || regionServers.isEmpty()) { + return sb.toString(); + } + for (ServerName regionServer : regionServers) { + List<ReplicationQueueId> queueIds = queueStorage.listAllQueueIds(regionServer); + + if (!liveRegionServers.contains(regionServer)) { + deadRegionServers.add(regionServer.getServerName()); + } + for (ReplicationQueueId queueId : queueIds) { + // wals + List<String> tmpWals = AbstractFSWALProvider + .getWALFiles(connection.getConfiguration(), + URLEncoder.encode(queueId.getServerWALsBelongTo().toString(), + StandardCharsets.UTF_8.name())) + .stream().map(Path::toString).collect(Collectors.toList()); + + // old wals + tmpWals.addAll(AbstractFSWALProvider Review Comment: The tmpWals is constructed by stream, which is not safe to add entries to it since it maybe immutable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@hbase.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org