ndimiduk commented on code in PR #5865: URL: https://github.com/apache/hbase/pull/5865#discussion_r1604690948
########## hbase-common/src/main/java/org/apache/hadoop/hbase/util/ReservoirSample.java: ########## @@ -41,7 +41,7 @@ public class ReservoirSample<T> { private int n; public ReservoirSample(int k) { - Preconditions.checkArgument(k > 0, "negative sampling number(%d) is not allowed"); + Preconditions.checkArgument(k > 0, "negative sampling number(%s) is not allowed", k); Review Comment: Checkstyle warning cleanup? ########## hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java: ########## @@ -86,6 +86,14 @@ public class TestVerifyReplication extends TestReplicationBase { @Rule public TestName name = new TestName(); + @Override + protected String getClusterKey(HBaseTestingUtil util) throws Exception { + // TODO: VerifyReplication does not support connection uri yet, so here we need to use cluster Review Comment: Is there an issue filed for this TODO? ########## hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java: ########## @@ -283,4 +291,8 @@ public AsyncRegionServerAdmin getRegionServer() { return regionServer; } } + + public static void main(String[] args) { Review Comment: Do you still need this? ########## hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtil.java: ########## @@ -3216,6 +3216,24 @@ public static <T> String safeGetAsStr(List<T> lst, int i) { } } + public String getRpcConnnectionURI() throws UnknownHostException { Review Comment: nit, return a URI? ########## hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistryURIFactory.java: ########## @@ -19,6 +19,7 @@ import java.io.IOException; import java.net.URI; +import org.apache.commons.lang3.StringUtils; Review Comment: Sorry, I forget which dependencies we expose transitively. Should we be using a shaded version of this class? ########## hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java: ########## @@ -166,8 +175,8 @@ public void onConfigurationChange(Configuration conf) { for (ReplicationPeerImpl peer : peerCache.values()) { try { peer.onConfigurationChange( - ReplicationUtils.getPeerClusterConfiguration(peer.getPeerConfig(), conf)); - } catch (ReplicationException e) { + ReplicationPeerConfigUtil.getPeerClusterConfiguration(conf, peer.getPeerConfig())); + } catch (IOException e) { LOG.warn("failed to reload configuration for peer {}", peer.getId(), e); Review Comment: Not your code, but, should this failure be fatal? ########## hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java: ########## @@ -402,6 +407,57 @@ public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationExcepti queueStorage.removePeerFromHFileRefs(peerId); } + private void checkClusterKey(String clusterKey, ReplicationEndpoint endpoint) + throws DoNotRetryIOException { + if (endpoint != null && !(endpoint instanceof HBaseReplicationEndpoint)) { + return; + } + // Endpoints implementing HBaseReplicationEndpoint need to check cluster key + URI connectionUri = ConnectionRegistryFactory.tryParseAsConnectionURI(clusterKey); + try { + if (connectionUri != null) { + ConnectionRegistryFactory.validate(connectionUri); + } else { + ZKConfig.validateClusterKey(clusterKey); + } + } catch (IOException e) { + throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e); + } + if (endpoint != null && endpoint.canReplicateToSameCluster()) { + return; + } + // make sure we do not replicate to same cluster + String peerClusterId; + try { + if (connectionUri != null) { + // fetch cluster id through standard admin API + try (Connection conn = ConnectionFactory.createConnection(connectionUri, conf); + Admin admin = conn.getAdmin()) { + peerClusterId = + admin.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.CLUSTER_ID)).getClusterId(); + } + } else { + // Create the peer cluster config for get peer cluster id + Configuration peerConf = HBaseConfiguration.createClusterConf(conf, clusterKey); + try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + "check-peer-cluster-id", null)) { + peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher); + } + } + } catch (IOException | KeeperException e) { + // we just want to check whether we will replicate to the same cluster, so if we get an error + // while getting the cluster id of the peer cluster, it means we are not connecting to + // ourselves, as we are still alive. So here we just log the error and continue Review Comment: So it's not possible to get an IOException or KeeperException when communicating with ourself? Is there a more precise means of ensuring that the cluster represented by this configuration is not ourself. ########## hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtil.java: ########## @@ -3216,6 +3216,24 @@ public static <T> String safeGetAsStr(List<T> lst, int i) { } } + public String getRpcConnnectionURI() throws UnknownHostException { + return "hbase+rpc://" + MasterRegistry.getMasterAddr(conf); + } + + public String getZkConnectionURI() { Review Comment: And here. ########## hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java: ########## @@ -402,6 +407,57 @@ public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationExcepti queueStorage.removePeerFromHFileRefs(peerId); } + private void checkClusterKey(String clusterKey, ReplicationEndpoint endpoint) + throws DoNotRetryIOException { + if (endpoint != null && !(endpoint instanceof HBaseReplicationEndpoint)) { + return; + } + // Endpoints implementing HBaseReplicationEndpoint need to check cluster key + URI connectionUri = ConnectionRegistryFactory.tryParseAsConnectionURI(clusterKey); + try { + if (connectionUri != null) { + ConnectionRegistryFactory.validate(connectionUri); + } else { + ZKConfig.validateClusterKey(clusterKey); + } + } catch (IOException e) { + throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e); + } + if (endpoint != null && endpoint.canReplicateToSameCluster()) { + return; + } + // make sure we do not replicate to same cluster + String peerClusterId; + try { + if (connectionUri != null) { + // fetch cluster id through standard admin API + try (Connection conn = ConnectionFactory.createConnection(connectionUri, conf); + Admin admin = conn.getAdmin()) { + peerClusterId = + admin.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.CLUSTER_ID)).getClusterId(); Review Comment: Should we expose the clusterId as its own endpoint, or part of a more specific endpoint? Rummaging around in the metrics for this value seems like the wrong place to go looking. ########## hbase-testing-util/src/main/java/org/apache/hadoop/hbase/HBaseTestingUtility.java: ########## @@ -3528,6 +3528,24 @@ public static <T> String safeGetAsStr(List<T> lst, int i) { } } + public String getRpcConnnectionURI() throws UnknownHostException { Review Comment: Return URI? ########## hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryURIFactory.java: ########## @@ -39,4 +39,10 @@ public interface ConnectionRegistryURIFactory { * {@link ConnectionRegistryFactory}. */ String getScheme(); + + /** + * Validate the given {@code uri}. + * @throws IOException if this is not a valid connection registry URI. + */ + void validate(URI uri) throws IOException; Review Comment: Maybe this interface should be `boolean isValid(URI)`, and then the caller is free to throw or not. ########## hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestGlobalReplicationThrottler.java: ########## @@ -158,35 +156,24 @@ public void testQuota() throws IOException { }); watcher.start(); - try (Table t1 = utility1.getConnection().getTable(tableName); - Table t2 = utility2.getConnection().getTable(tableName)) { + try (Table t1 = utility1.getConnection().getTable(tableName)) { for (int i = 0; i < 50; i++) { Put put = new Put(ROWS[i]); put.addColumn(famName, VALUE, VALUE); t1.put(put); } - long start = EnvironmentEdgeManager.currentTime(); - while (EnvironmentEdgeManager.currentTime() - start < 180000) { - Scan scan = new Scan(); - scan.setCaching(50); - int count = 0; - try (ResultScanner results = t2.getScanner(scan)) { - for (Result result : results) { - count++; - } - } - if (count < 50) { - LOG.info("Waiting all logs pushed to slave. Expected 50 , actual " + count); - Threads.sleep(200); - continue; - } - break; - } } + utility2.waitFor(180000, () -> { Review Comment: 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@hbase.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org