Apache9 commented on code in PR #4810:
URL: https://github.com/apache/hbase/pull/4810#discussion_r1014650889


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java:
##########
@@ -294,80 +346,117 @@ public String 
dumpPeersState(List<ReplicationPeerDescription> peers) throws Exce
     return sb.toString();
   }
 
-  public String dumpQueues(ZKWatcher zkw, Set<String> peerIds, boolean hdfs) 
throws Exception {
-    ReplicationQueueStorage queueStorage;
+  public String dumpQueues(Connection connection, Set<String> peerIds, boolean 
hdfs,
+    Configuration conf) throws Exception {
     StringBuilder sb = new StringBuilder();
+    ReplicationQueueStorage queueStorage =
+      ReplicationStorageFactory.getReplicationQueueStorage(connection, conf);
+
+    Set<ServerName> liveRegionServers =
+      
connection.getAdmin().getClusterMetrics().getLiveServerMetrics().keySet();
 
-    // queueStorage = 
ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
-    // Set<ServerName> liveRegionServers = ZKUtil.listChildrenNoWatch(zkw,
-    // zkw.getZNodePaths().rsZNode)
-    // .stream().map(ServerName::parseServerName).collect(Collectors.toSet());
-    //
-    // Loops each peer on each RS and dumps the queues
-    // List<ServerName> regionservers = queueStorage.getListOfReplicators();
-    // if (regionservers == null || regionservers.isEmpty()) {
-    // return sb.toString();
-    // }
-    // for (ServerName regionserver : regionservers) {
-    // List<String> queueIds = queueStorage.getAllQueues(regionserver);
-    // if (!liveRegionServers.contains(regionserver)) {
-    // deadRegionServers.add(regionserver.getServerName());
-    // }
-    // for (String queueId : queueIds) {
-    // ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
-    // List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId);
-    // Collections.sort(wals);
-    // if (!peerIds.contains(queueInfo.getPeerId())) {
-    // deletedQueues.add(regionserver + "/" + queueId);
-    // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, 
wals, true, hdfs));
-    // } else {
-    // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, 
wals, false, hdfs));
-    // }
-    // }
-    // }
+    List<ServerName> regionServers = queueStorage.listAllReplicators();
+    if (regionServers == null || regionServers.isEmpty()) {
+      return sb.toString();
+    }
+    for (ServerName regionServer : regionServers) {
+      List<ReplicationQueueId> queueIds = 
queueStorage.listAllQueueIds(regionServer);
+
+      if (!liveRegionServers.contains(regionServer)) {
+        deadRegionServers.add(regionServer.getServerName());
+      }
+      for (ReplicationQueueId queueId : queueIds) {
+        // wals
+        List<String> tmpWals = AbstractFSWALProvider
+          .getWALFiles(connection.getConfiguration(),
+            URLEncoder.encode(queueId.getServerWALsBelongTo().toString(),
+              StandardCharsets.UTF_8.name()))
+          .stream().map(Path::toString).collect(Collectors.toList());
+
+        // old wals
+        tmpWals.addAll(AbstractFSWALProvider
+          .getArchivedWALFiles(connection.getConfiguration(), 
queueId.getServerWALsBelongTo(),
+            URLEncoder.encode(queueId.getServerWALsBelongTo().toString(),
+              StandardCharsets.UTF_8.name()))
+          .stream().map(Path::toString).collect(Collectors.toList()));
+
+        Map<String, ReplicationGroupOffset> offsets = 
queueStorage.getOffsets(queueId);
+        // filter out the wal files that should replicate
+        List<String> wals = new ArrayList<>();
+        for (Map.Entry<String, ReplicationGroupOffset> entry : 
offsets.entrySet()) {
+          ReplicationGroupOffset offset = entry.getValue();
+          for (String wal : tmpWals) {
+            if (ReplicationOffsetUtil.shouldReplicate(offset, wal)) {
+              wals.add(wal);
+            }
+          }
+          // only for test
+          if (tmpWals.isEmpty() && !wals.contains(offset.getWal())) {

Review Comment:
   What does this mean?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java:
##########
@@ -294,80 +346,117 @@ public String 
dumpPeersState(List<ReplicationPeerDescription> peers) throws Exce
     return sb.toString();
   }
 
-  public String dumpQueues(ZKWatcher zkw, Set<String> peerIds, boolean hdfs) 
throws Exception {
-    ReplicationQueueStorage queueStorage;
+  public String dumpQueues(Connection connection, Set<String> peerIds, boolean 
hdfs,
+    Configuration conf) throws Exception {
     StringBuilder sb = new StringBuilder();
+    ReplicationQueueStorage queueStorage =
+      ReplicationStorageFactory.getReplicationQueueStorage(connection, conf);
+
+    Set<ServerName> liveRegionServers =
+      
connection.getAdmin().getClusterMetrics().getLiveServerMetrics().keySet();
 
-    // queueStorage = 
ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
-    // Set<ServerName> liveRegionServers = ZKUtil.listChildrenNoWatch(zkw,
-    // zkw.getZNodePaths().rsZNode)
-    // .stream().map(ServerName::parseServerName).collect(Collectors.toSet());
-    //
-    // Loops each peer on each RS and dumps the queues
-    // List<ServerName> regionservers = queueStorage.getListOfReplicators();
-    // if (regionservers == null || regionservers.isEmpty()) {
-    // return sb.toString();
-    // }
-    // for (ServerName regionserver : regionservers) {
-    // List<String> queueIds = queueStorage.getAllQueues(regionserver);
-    // if (!liveRegionServers.contains(regionserver)) {
-    // deadRegionServers.add(regionserver.getServerName());
-    // }
-    // for (String queueId : queueIds) {
-    // ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
-    // List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId);
-    // Collections.sort(wals);
-    // if (!peerIds.contains(queueInfo.getPeerId())) {
-    // deletedQueues.add(regionserver + "/" + queueId);
-    // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, 
wals, true, hdfs));
-    // } else {
-    // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, 
wals, false, hdfs));
-    // }
-    // }
-    // }
+    List<ServerName> regionServers = queueStorage.listAllReplicators();
+    if (regionServers == null || regionServers.isEmpty()) {
+      return sb.toString();
+    }
+    for (ServerName regionServer : regionServers) {
+      List<ReplicationQueueId> queueIds = 
queueStorage.listAllQueueIds(regionServer);
+
+      if (!liveRegionServers.contains(regionServer)) {
+        deadRegionServers.add(regionServer.getServerName());
+      }
+      for (ReplicationQueueId queueId : queueIds) {
+        // wals
+        List<String> tmpWals = AbstractFSWALProvider
+          .getWALFiles(connection.getConfiguration(),
+            URLEncoder.encode(queueId.getServerWALsBelongTo().toString(),
+              StandardCharsets.UTF_8.name()))
+          .stream().map(Path::toString).collect(Collectors.toList());
+
+        // old wals
+        tmpWals.addAll(AbstractFSWALProvider
+          .getArchivedWALFiles(connection.getConfiguration(), 
queueId.getServerWALsBelongTo(),
+            URLEncoder.encode(queueId.getServerWALsBelongTo().toString(),
+              StandardCharsets.UTF_8.name()))
+          .stream().map(Path::toString).collect(Collectors.toList()));
+
+        Map<String, ReplicationGroupOffset> offsets = 
queueStorage.getOffsets(queueId);
+        // filter out the wal files that should replicate
+        List<String> wals = new ArrayList<>();
+        for (Map.Entry<String, ReplicationGroupOffset> entry : 
offsets.entrySet()) {
+          ReplicationGroupOffset offset = entry.getValue();
+          for (String wal : tmpWals) {
+            if (ReplicationOffsetUtil.shouldReplicate(offset, wal)) {
+              wals.add(wal);
+            }
+          }
+          // only for test
+          if (tmpWals.isEmpty() && !wals.contains(offset.getWal())) {
+            wals.add(offset.getWal());
+          }
+        }
+
+        Collections.sort(wals, 
Comparator.comparingLong(AbstractFSWALProvider::getTimestamp));
+        if (!peerIds.contains(queueId.getPeerId())) {
+          deletedQueues.add(regionServer + "/" + queueId);
+          sb.append(formatQueue(regionServer, offsets, wals, queueId, true, 
hdfs));
+        } else {
+          sb.append(formatQueue(regionServer, offsets, wals, queueId, false, 
hdfs));
+        }
+      }
+    }
     return sb.toString();
   }
 
-  private String formatQueue(ServerName regionserver, ReplicationQueueStorage 
queueStorage,
-    ReplicationQueueInfo queueInfo, String queueId, List<String> wals, boolean 
isDeleted,
-    boolean hdfs) throws Exception {
+  private String formatQueue(ServerName regionServer, Map<String, 
ReplicationGroupOffset> offsets,
+    List<String> wals, ReplicationQueueId queueId, boolean isDeleted, boolean 
hdfs)
+    throws Exception {
     StringBuilder sb = new StringBuilder();
 
-    List<ServerName> deadServers;
-
-    sb.append("Dumping replication queue info for RegionServer: [" + 
regionserver + "]" + "\n");
-    sb.append("    Queue znode: " + queueId + "\n");
-    sb.append("    PeerID: " + queueInfo.getPeerId() + "\n");
-    sb.append("    Recovered: " + queueInfo.isQueueRecovered() + "\n");
-    deadServers = queueInfo.getDeadRegionServers();
-    if (deadServers.isEmpty()) {
-      sb.append("    No dead RegionServers found in this queue." + "\n");
+    sb.append("Dumping replication queue info for RegionServer: [" + 
regionServer + "]" + "\n");
+    sb.append("    Queue id: " + queueId + "\n");
+    sb.append("    PeerID: " + queueId.getPeerId() + "\n");
+    sb.append("    Recovered: " + queueId.isRecovered() + "\n");
+    // In new version, we only record the first dead RegionServer in queueId.
+    if (queueId.getSourceServerName().isPresent()) {
+      sb.append("    Dead RegionServer: " + 
queueId.getSourceServerName().get() + "\n");
     } else {
-      sb.append("    Dead RegionServers: " + deadServers + "\n");
+      sb.append("    No dead RegionServer found in this queue." + "\n");
     }
     sb.append("    Was deleted: " + isDeleted + "\n");
     sb.append("    Number of WALs in replication queue: " + wals.size() + 
"\n");
-    peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size());
-
-    for (String wal : wals) {
-      // long position = queueStorage.getWALPosition(regionserver, 
queueInfo.getPeerId(), wal);
-      // sb.append(" Replication position for " + wal + ": "
-      // + (position > 0 ? position : "0" + " (not started or nothing to 
replicate)") + "\n");
+    peersQueueSize.addAndGet(queueId.getPeerId(), wals.size());
+
+    for (Map.Entry<String, ReplicationGroupOffset> entry : offsets.entrySet()) 
{
+      String walGroup = entry.getKey();
+      ReplicationGroupOffset offset = entry.getValue();
+      for (String wal : wals) {
+        if (offset.getWal().equals(wal)) {

Review Comment:
   Why only log when equal? This means we will only log one file per group?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java:
##########
@@ -294,80 +346,117 @@ public String 
dumpPeersState(List<ReplicationPeerDescription> peers) throws Exce
     return sb.toString();
   }
 
-  public String dumpQueues(ZKWatcher zkw, Set<String> peerIds, boolean hdfs) 
throws Exception {
-    ReplicationQueueStorage queueStorage;
+  public String dumpQueues(Connection connection, Set<String> peerIds, boolean 
hdfs,
+    Configuration conf) throws Exception {
     StringBuilder sb = new StringBuilder();
+    ReplicationQueueStorage queueStorage =
+      ReplicationStorageFactory.getReplicationQueueStorage(connection, conf);
+
+    Set<ServerName> liveRegionServers =
+      
connection.getAdmin().getClusterMetrics().getLiveServerMetrics().keySet();
 
-    // queueStorage = 
ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
-    // Set<ServerName> liveRegionServers = ZKUtil.listChildrenNoWatch(zkw,
-    // zkw.getZNodePaths().rsZNode)
-    // .stream().map(ServerName::parseServerName).collect(Collectors.toSet());
-    //
-    // Loops each peer on each RS and dumps the queues
-    // List<ServerName> regionservers = queueStorage.getListOfReplicators();
-    // if (regionservers == null || regionservers.isEmpty()) {
-    // return sb.toString();
-    // }
-    // for (ServerName regionserver : regionservers) {
-    // List<String> queueIds = queueStorage.getAllQueues(regionserver);
-    // if (!liveRegionServers.contains(regionserver)) {
-    // deadRegionServers.add(regionserver.getServerName());
-    // }
-    // for (String queueId : queueIds) {
-    // ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
-    // List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId);
-    // Collections.sort(wals);
-    // if (!peerIds.contains(queueInfo.getPeerId())) {
-    // deletedQueues.add(regionserver + "/" + queueId);
-    // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, 
wals, true, hdfs));
-    // } else {
-    // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, 
wals, false, hdfs));
-    // }
-    // }
-    // }
+    List<ServerName> regionServers = queueStorage.listAllReplicators();
+    if (regionServers == null || regionServers.isEmpty()) {
+      return sb.toString();
+    }
+    for (ServerName regionServer : regionServers) {
+      List<ReplicationQueueId> queueIds = 
queueStorage.listAllQueueIds(regionServer);
+
+      if (!liveRegionServers.contains(regionServer)) {
+        deadRegionServers.add(regionServer.getServerName());
+      }
+      for (ReplicationQueueId queueId : queueIds) {
+        // wals
+        List<String> tmpWals = AbstractFSWALProvider
+          .getWALFiles(connection.getConfiguration(),
+            URLEncoder.encode(queueId.getServerWALsBelongTo().toString(),
+              StandardCharsets.UTF_8.name()))
+          .stream().map(Path::toString).collect(Collectors.toList());
+
+        // old wals
+        tmpWals.addAll(AbstractFSWALProvider

Review Comment:
   The tmpWals is constructed by stream, which is not safe to add entries to it 
since it maybe immutable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@hbase.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to