github-advanced-security[bot] commented on code in PR #23213:
URL: https://github.com/apache/pulsar/pull/23213#discussion_r1728730868
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##########
@@ -1160,4 +1164,121 @@
admin1.namespaces().deleteNamespace(ns);
admin2.namespaces().deleteNamespace(ns);
}
+
+ @Test
+ public void testReplicationCountMetrics() throws Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ nonReplicatedNamespace + "/tp_");
+ // 1.Create topic, does not enable replication now.
+ admin1.topics().createNonPartitionedTopic(topicName);
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+
+ // We inject an error to make the internal producer fail to connect.
+ final AtomicInteger createProducerCounter = new AtomicInteger();
+ final AtomicBoolean failedCreateProducer = new AtomicBoolean(true);
+ Runnable taskToClearInjection =
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
+ if (topicName.equals(producerCnf.getTopicName())) {
+ // There is a switch to determine create producer successfully
or not.
+ if (failedCreateProducer.get()) {
+ log.info("Retry create replicator.producer count: {}",
createProducerCounter);
+ // Release producer and fail callback.
+ originalProducer.closeAsync();
+ throw new RuntimeException("mock error");
+ }
+ return originalProducer;
+ }
+ return originalProducer;
+ });
+
+ // 2.Enable replication.
+ admin1.topics().setReplicationClusters(topicName,
Arrays.asList(cluster1, cluster2));
+
+ // Verify: metrics.
+ // Cluster level:
+ // - pulsar_replication_connected_count
+ // - pulsar_replication_disconnected_count
+ // Namespace level:
+ // - pulsar_replication_connected_count
+ // - pulsar_replication_disconnected_count
+ // Topic level:
+ // - pulsar_replication_connected_count
+ // - pulsar_replication_disconnected_count
+ JerseyClient httpClient = JerseyClientBuilder.createClient();
+ Awaitility.await().untilAsserted(() -> {
+ int topicConnected = 0;
+ int topicDisconnected = 0;
+
+ String response =
httpClient.target(pulsar1.getWebServiceAddress()).path("/metrics/")
+ .request().get(String.class);
+ Multimap<String, PrometheusMetricsClient.Metric> metricMap =
PrometheusMetricsClient.parseMetrics(response);
+ if
(!metricMap.containsKey("pulsar_replication_disconnected_count")) {
+ fail("Expected 1 disconnected replicator.");
+ }
+ for (PrometheusMetricsClient.Metric metric :
metricMap.get("pulsar_replication_connected_count")) {
+ if (cluster1.equals(metric.tags.get("cluster"))
+ &&
nonReplicatedNamespace.equals(metric.tags.get("namespace"))
+ && topicName.equals(metric.tags.get("topic"))) {
+ topicConnected += metric.value;
Review Comment:
## Implicit narrowing conversion in compound assignment
Implicit cast of source type double to narrower destination type [int](1).
[Show more
details](https://github.com/apache/pulsar/security/code-scanning/137)
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##########
@@ -1160,4 +1164,121 @@
admin1.namespaces().deleteNamespace(ns);
admin2.namespaces().deleteNamespace(ns);
}
+
+ @Test
+ public void testReplicationCountMetrics() throws Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ nonReplicatedNamespace + "/tp_");
+ // 1.Create topic, does not enable replication now.
+ admin1.topics().createNonPartitionedTopic(topicName);
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+
+ // We inject an error to make the internal producer fail to connect.
+ final AtomicInteger createProducerCounter = new AtomicInteger();
+ final AtomicBoolean failedCreateProducer = new AtomicBoolean(true);
+ Runnable taskToClearInjection =
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
+ if (topicName.equals(producerCnf.getTopicName())) {
+ // There is a switch to determine create producer successfully
or not.
+ if (failedCreateProducer.get()) {
+ log.info("Retry create replicator.producer count: {}",
createProducerCounter);
+ // Release producer and fail callback.
+ originalProducer.closeAsync();
+ throw new RuntimeException("mock error");
+ }
+ return originalProducer;
+ }
+ return originalProducer;
+ });
+
+ // 2.Enable replication.
+ admin1.topics().setReplicationClusters(topicName,
Arrays.asList(cluster1, cluster2));
+
+ // Verify: metrics.
+ // Cluster level:
+ // - pulsar_replication_connected_count
+ // - pulsar_replication_disconnected_count
+ // Namespace level:
+ // - pulsar_replication_connected_count
+ // - pulsar_replication_disconnected_count
+ // Topic level:
+ // - pulsar_replication_connected_count
+ // - pulsar_replication_disconnected_count
+ JerseyClient httpClient = JerseyClientBuilder.createClient();
+ Awaitility.await().untilAsserted(() -> {
+ int topicConnected = 0;
+ int topicDisconnected = 0;
+
+ String response =
httpClient.target(pulsar1.getWebServiceAddress()).path("/metrics/")
+ .request().get(String.class);
+ Multimap<String, PrometheusMetricsClient.Metric> metricMap =
PrometheusMetricsClient.parseMetrics(response);
+ if
(!metricMap.containsKey("pulsar_replication_disconnected_count")) {
+ fail("Expected 1 disconnected replicator.");
+ }
+ for (PrometheusMetricsClient.Metric metric :
metricMap.get("pulsar_replication_connected_count")) {
+ if (cluster1.equals(metric.tags.get("cluster"))
+ &&
nonReplicatedNamespace.equals(metric.tags.get("namespace"))
+ && topicName.equals(metric.tags.get("topic"))) {
+ topicConnected += metric.value;
+ }
+ }
+ for (PrometheusMetricsClient.Metric metric :
metricMap.get("pulsar_replication_disconnected_count")) {
+ if (cluster1.equals(metric.tags.get("cluster"))
+ &&
nonReplicatedNamespace.equals(metric.tags.get("namespace"))
+ && topicName.equals(metric.tags.get("topic"))) {
+ topicDisconnected += metric.value;
+ }
+ }
+ log.info("{}, {},", topicConnected, topicDisconnected);
+ assertEquals(topicConnected, 0);
+ assertEquals(topicDisconnected, 1);
+ });
+
+ // Let replicator connect successfully.
+ failedCreateProducer.set(false);
+ // Verify: metrics.
+ // Cluster level:
+ // - pulsar_replication_connected_count
+ // - pulsar_replication_disconnected_count
+ // Namespace level:
+ // - pulsar_replication_connected_count
+ // - pulsar_replication_disconnected_count
+ // Topic level:
+ // - pulsar_replication_connected_count
+ // - pulsar_replication_disconnected_count
+ Awaitility.await().atMost(Duration.ofSeconds(130)).untilAsserted(() ->
{
+ int topicConnected = 0;
+ int topicDisconnected = 0;
+
+ String response =
httpClient.target(pulsar1.getWebServiceAddress()).path("/metrics/")
+ .request().get(String.class);
+ Multimap<String, PrometheusMetricsClient.Metric> metricMap =
PrometheusMetricsClient.parseMetrics(response);
+ if
(!metricMap.containsKey("pulsar_replication_disconnected_count")) {
+ fail("Expected 1 disconnected replicator.");
+ }
+ for (PrometheusMetricsClient.Metric metric :
metricMap.get("pulsar_replication_connected_count")) {
+ if (cluster1.equals(metric.tags.get("cluster"))
+ &&
nonReplicatedNamespace.equals(metric.tags.get("namespace"))
+ && topicName.equals(metric.tags.get("topic"))) {
+ topicConnected += metric.value;
+ }
+ }
+ for (PrometheusMetricsClient.Metric metric :
metricMap.get("pulsar_replication_disconnected_count")) {
+ if (cluster1.equals(metric.tags.get("cluster"))
+ &&
nonReplicatedNamespace.equals(metric.tags.get("namespace"))
+ && topicName.equals(metric.tags.get("topic"))) {
+ topicDisconnected += metric.value;
Review Comment:
## Implicit narrowing conversion in compound assignment
Implicit cast of source type double to narrower destination type [int](1).
[Show more
details](https://github.com/apache/pulsar/security/code-scanning/140)
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##########
@@ -1160,4 +1164,121 @@
admin1.namespaces().deleteNamespace(ns);
admin2.namespaces().deleteNamespace(ns);
}
+
+ @Test
+ public void testReplicationCountMetrics() throws Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ nonReplicatedNamespace + "/tp_");
+ // 1.Create topic, does not enable replication now.
+ admin1.topics().createNonPartitionedTopic(topicName);
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+
+ // We inject an error to make the internal producer fail to connect.
+ final AtomicInteger createProducerCounter = new AtomicInteger();
+ final AtomicBoolean failedCreateProducer = new AtomicBoolean(true);
+ Runnable taskToClearInjection =
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
+ if (topicName.equals(producerCnf.getTopicName())) {
+ // There is a switch to determine create producer successfully
or not.
+ if (failedCreateProducer.get()) {
+ log.info("Retry create replicator.producer count: {}",
createProducerCounter);
+ // Release producer and fail callback.
+ originalProducer.closeAsync();
+ throw new RuntimeException("mock error");
+ }
+ return originalProducer;
+ }
+ return originalProducer;
+ });
+
+ // 2.Enable replication.
+ admin1.topics().setReplicationClusters(topicName,
Arrays.asList(cluster1, cluster2));
+
+ // Verify: metrics.
+ // Cluster level:
+ // - pulsar_replication_connected_count
+ // - pulsar_replication_disconnected_count
+ // Namespace level:
+ // - pulsar_replication_connected_count
+ // - pulsar_replication_disconnected_count
+ // Topic level:
+ // - pulsar_replication_connected_count
+ // - pulsar_replication_disconnected_count
+ JerseyClient httpClient = JerseyClientBuilder.createClient();
+ Awaitility.await().untilAsserted(() -> {
+ int topicConnected = 0;
+ int topicDisconnected = 0;
+
+ String response =
httpClient.target(pulsar1.getWebServiceAddress()).path("/metrics/")
+ .request().get(String.class);
+ Multimap<String, PrometheusMetricsClient.Metric> metricMap =
PrometheusMetricsClient.parseMetrics(response);
+ if
(!metricMap.containsKey("pulsar_replication_disconnected_count")) {
+ fail("Expected 1 disconnected replicator.");
+ }
+ for (PrometheusMetricsClient.Metric metric :
metricMap.get("pulsar_replication_connected_count")) {
+ if (cluster1.equals(metric.tags.get("cluster"))
+ &&
nonReplicatedNamespace.equals(metric.tags.get("namespace"))
+ && topicName.equals(metric.tags.get("topic"))) {
+ topicConnected += metric.value;
+ }
+ }
+ for (PrometheusMetricsClient.Metric metric :
metricMap.get("pulsar_replication_disconnected_count")) {
+ if (cluster1.equals(metric.tags.get("cluster"))
+ &&
nonReplicatedNamespace.equals(metric.tags.get("namespace"))
+ && topicName.equals(metric.tags.get("topic"))) {
+ topicDisconnected += metric.value;
+ }
+ }
+ log.info("{}, {},", topicConnected, topicDisconnected);
+ assertEquals(topicConnected, 0);
+ assertEquals(topicDisconnected, 1);
+ });
+
+ // Let replicator connect successfully.
+ failedCreateProducer.set(false);
+ // Verify: metrics.
+ // Cluster level:
+ // - pulsar_replication_connected_count
+ // - pulsar_replication_disconnected_count
+ // Namespace level:
+ // - pulsar_replication_connected_count
+ // - pulsar_replication_disconnected_count
+ // Topic level:
+ // - pulsar_replication_connected_count
+ // - pulsar_replication_disconnected_count
+ Awaitility.await().atMost(Duration.ofSeconds(130)).untilAsserted(() ->
{
+ int topicConnected = 0;
+ int topicDisconnected = 0;
+
+ String response =
httpClient.target(pulsar1.getWebServiceAddress()).path("/metrics/")
+ .request().get(String.class);
+ Multimap<String, PrometheusMetricsClient.Metric> metricMap =
PrometheusMetricsClient.parseMetrics(response);
+ if
(!metricMap.containsKey("pulsar_replication_disconnected_count")) {
+ fail("Expected 1 disconnected replicator.");
+ }
+ for (PrometheusMetricsClient.Metric metric :
metricMap.get("pulsar_replication_connected_count")) {
+ if (cluster1.equals(metric.tags.get("cluster"))
+ &&
nonReplicatedNamespace.equals(metric.tags.get("namespace"))
+ && topicName.equals(metric.tags.get("topic"))) {
+ topicConnected += metric.value;
Review Comment:
## Implicit narrowing conversion in compound assignment
Implicit cast of source type double to narrower destination type [int](1).
[Show more
details](https://github.com/apache/pulsar/security/code-scanning/139)
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##########
@@ -1160,4 +1164,121 @@
admin1.namespaces().deleteNamespace(ns);
admin2.namespaces().deleteNamespace(ns);
}
+
+ @Test
+ public void testReplicationCountMetrics() throws Exception {
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://"
+ nonReplicatedNamespace + "/tp_");
+ // 1.Create topic, does not enable replication now.
+ admin1.topics().createNonPartitionedTopic(topicName);
+ PersistentTopic persistentTopic =
+ (PersistentTopic)
pulsar1.getBrokerService().getTopic(topicName, false).join().get();
+
+ // We inject an error to make the internal producer fail to connect.
+ final AtomicInteger createProducerCounter = new AtomicInteger();
+ final AtomicBoolean failedCreateProducer = new AtomicBoolean(true);
+ Runnable taskToClearInjection =
injectMockReplicatorProducerBuilder((producerCnf, originalProducer) -> {
+ if (topicName.equals(producerCnf.getTopicName())) {
+ // There is a switch to determine create producer successfully
or not.
+ if (failedCreateProducer.get()) {
+ log.info("Retry create replicator.producer count: {}",
createProducerCounter);
+ // Release producer and fail callback.
+ originalProducer.closeAsync();
+ throw new RuntimeException("mock error");
+ }
+ return originalProducer;
+ }
+ return originalProducer;
+ });
+
+ // 2.Enable replication.
+ admin1.topics().setReplicationClusters(topicName,
Arrays.asList(cluster1, cluster2));
+
+ // Verify: metrics.
+ // Cluster level:
+ // - pulsar_replication_connected_count
+ // - pulsar_replication_disconnected_count
+ // Namespace level:
+ // - pulsar_replication_connected_count
+ // - pulsar_replication_disconnected_count
+ // Topic level:
+ // - pulsar_replication_connected_count
+ // - pulsar_replication_disconnected_count
+ JerseyClient httpClient = JerseyClientBuilder.createClient();
+ Awaitility.await().untilAsserted(() -> {
+ int topicConnected = 0;
+ int topicDisconnected = 0;
+
+ String response =
httpClient.target(pulsar1.getWebServiceAddress()).path("/metrics/")
+ .request().get(String.class);
+ Multimap<String, PrometheusMetricsClient.Metric> metricMap =
PrometheusMetricsClient.parseMetrics(response);
+ if
(!metricMap.containsKey("pulsar_replication_disconnected_count")) {
+ fail("Expected 1 disconnected replicator.");
+ }
+ for (PrometheusMetricsClient.Metric metric :
metricMap.get("pulsar_replication_connected_count")) {
+ if (cluster1.equals(metric.tags.get("cluster"))
+ &&
nonReplicatedNamespace.equals(metric.tags.get("namespace"))
+ && topicName.equals(metric.tags.get("topic"))) {
+ topicConnected += metric.value;
+ }
+ }
+ for (PrometheusMetricsClient.Metric metric :
metricMap.get("pulsar_replication_disconnected_count")) {
+ if (cluster1.equals(metric.tags.get("cluster"))
+ &&
nonReplicatedNamespace.equals(metric.tags.get("namespace"))
+ && topicName.equals(metric.tags.get("topic"))) {
+ topicDisconnected += metric.value;
Review Comment:
## Implicit narrowing conversion in compound assignment
Implicit cast of source type double to narrower destination type [int](1).
[Show more
details](https://github.com/apache/pulsar/security/code-scanning/138)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]