This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 38fae22588cc0041ad6794e6dd9d275ddd31f8bd Author: feynmanlin <[email protected]> AuthorDate: Wed Jan 20 01:02:52 2021 +0800 [flaky test] Fix unit tests that occasionally fail (#9226) * fix unit test * fix unit test * fix unit test (cherry picked from commit 63acd204914734c7098e5b7fb9e6fb3ee37810a7) --- .../broker/admin/IncrementPartitionsTest.java | 13 ++- .../pulsar/broker/admin/TopicMessageTTLTest.java | 1 + .../pulsar/broker/admin/v1/V1_AdminApiTest2.java | 1 + .../broker/service/ConsumedLedgersTrimTest.java | 7 +- .../service/MessagePublishBufferThrottleTest.java | 1 + .../broker/service/ReplicatorRateLimiterTest.java | 3 + .../pulsar/broker/service/ReplicatorTestBase.java | 129 +++++++++++++-------- .../service/persistent/TopicDuplicationTest.java | 28 ++--- 8 files changed, 113 insertions(+), 70 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java index 84fa300..0f78afa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java @@ -20,6 +20,8 @@ package org.apache.pulsar.broker.admin; import static org.testng.Assert.assertEquals; +import java.util.concurrent.TimeUnit; + import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.api.Consumer; @@ -30,6 +32,7 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -119,13 +122,17 @@ public class IncrementPartitionsTest extends MockedPulsarServiceBaseTest { .create(); admin.topics().updatePartitionedTopic(partitionedTopicName, 2); - assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 2); + //zk update takes some time + Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> + assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 2)); admin.topics().updatePartitionedTopic(partitionedTopicName, 10); - assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 10); + Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> + assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 10)); admin.topics().updatePartitionedTopic(partitionedTopicName, 20); - assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 20); + Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> + assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 20)); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicMessageTTLTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicMessageTTLTest.java index 7bb93d7..e0eb52b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicMessageTTLTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicMessageTTLTest.java @@ -49,6 +49,7 @@ public class TopicMessageTTLTest extends MockedPulsarServiceBaseTest { @BeforeMethod @Override protected void setup() throws Exception { + resetConfig(); this.conf.setSystemTopicEnabled(true); this.conf.setTopicLevelPoliciesEnabled(true); this.conf.setTtlDurationDefaultInSeconds(3600); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java index e797eb8..6d4148e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java @@ -83,6 +83,7 @@ public class V1_AdminApiTest2 extends MockedPulsarServiceBaseTest { @BeforeMethod @Override public void setup() throws Exception { + resetConfig(); conf.setLoadBalancerEnabled(true); super.internalSetup(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java index cc84de2..c06126b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java @@ -36,6 +36,8 @@ import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.MessageId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; public class ConsumedLedgersTrimTest extends BrokerTestBase { @@ -46,9 +48,10 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase { //No-op } + @AfterMethod @Override protected void cleanup() throws Exception { - //No-op + super.internalCleanup(); } @Test @@ -102,7 +105,7 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase { @Test - public void TestConsumedLedgersTrimNoSubscriptions() throws Exception { + public void testConsumedLedgersTrimNoSubscriptions() throws Exception { conf.setRetentionCheckIntervalInSeconds(1); conf.setBrokerDeleteInactiveTopicsEnabled(false); super.baseSetup(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java index 6dd4515..75248f4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessagePublishBufferThrottleTest.java @@ -43,6 +43,7 @@ public class MessagePublishBufferThrottleTest extends BrokerTestBase { @Override protected void cleanup() throws Exception { super.internalCleanup(); + resetConfig(); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java index 4c04ccf..3cb2247 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.java @@ -63,6 +63,9 @@ public class ReplicatorRateLimiterTest extends ReplicatorTestBase { @AfterClass(alwaysRun = true, timeOut = 300000) void shutdown() throws Exception { super.shutdown(); + resetConfig1(); + resetConfig2(); + resetConfig3(); } enum DispatchRateType { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index 9f7ef09..38ed451 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -110,23 +110,7 @@ public class ReplicatorTestBase { // NOTE: we have to instantiate a new copy of System.getProperties() to make sure pulsar1 and pulsar2 have // completely // independent config objects instead of referring to the same properties object - config1.setClusterName("r1"); - config1.setAdvertisedAddress("localhost"); - config1.setWebServicePort(Optional.of(0)); - config1.setWebServicePortTls(Optional.of(0)); - config1.setZookeeperServers("127.0.0.1:" + bkEnsemble1.getZookeeperPort()); - config1.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo"); - config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic()); - config1.setBrokerDeleteInactiveTopicsFrequencySeconds( - inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); - config1.setBrokerServicePort(Optional.of(0)); - config1.setBrokerServicePortTls(Optional.of(0)); - config1.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); - config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); - config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH); - config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); - config1.setDefaultNumberOfNamespaceBundles(1); - config1.setAllowAutoTopicCreationType("non-partitioned"); + setConfig1DefaultValue(); pulsar1 = new PulsarService(config1); pulsar1.start(); ns1 = pulsar1.getBrokerService(); @@ -141,23 +125,7 @@ public class ReplicatorTestBase { bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> 0); bkEnsemble2.start(); - config2.setClusterName("r2"); - config2.setAdvertisedAddress("localhost"); - config2.setWebServicePort(Optional.of(0)); - config2.setWebServicePortTls(Optional.of(0)); - config2.setZookeeperServers("127.0.0.1:" + bkEnsemble2.getZookeeperPort()); - config2.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo"); - config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic()); - config2.setBrokerDeleteInactiveTopicsFrequencySeconds( - inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); - config2.setBrokerServicePort(Optional.of(0)); - config2.setBrokerServicePortTls(Optional.of(0)); - config2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); - config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); - config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH); - config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); - config2.setDefaultNumberOfNamespaceBundles(1); - config2.setAllowAutoTopicCreationType("non-partitioned"); + setConfig2DefaultValue(); pulsar2 = new PulsarService(config2); pulsar2.start(); ns2 = pulsar2.getBrokerService(); @@ -172,23 +140,7 @@ public class ReplicatorTestBase { bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0, () -> 0); bkEnsemble3.start(); - config3.setClusterName("r3"); - config3.setAdvertisedAddress("localhost"); - config3.setWebServicePort(Optional.of(0)); - config3.setWebServicePortTls(Optional.of(0)); - config3.setZookeeperServers("127.0.0.1:" + bkEnsemble3.getZookeeperPort()); - config3.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo"); - config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic()); - config3.setBrokerDeleteInactiveTopicsFrequencySeconds( - inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); - config3.setBrokerServicePort(Optional.of(0)); - config3.setBrokerServicePortTls(Optional.of(0)); - config3.setTlsEnabled(true); - config3.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); - config3.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); - config3.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH); - config3.setDefaultNumberOfNamespaceBundles(1); - config3.setAllowAutoTopicCreationType("non-partitioned"); + setConfig3DefaultValue(); pulsar3 = new PulsarService(config3); pulsar3.start(); ns3 = pulsar3.getBrokerService(); @@ -227,6 +179,81 @@ public class ReplicatorTestBase { } + private void setConfig3DefaultValue() { + config3.setClusterName("r3"); + config3.setAdvertisedAddress("localhost"); + config3.setWebServicePort(Optional.of(0)); + config3.setWebServicePortTls(Optional.of(0)); + config3.setZookeeperServers("127.0.0.1:" + bkEnsemble3.getZookeeperPort()); + config3.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo"); + config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic()); + config3.setBrokerDeleteInactiveTopicsFrequencySeconds( + inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); + config3.setBrokerServicePort(Optional.of(0)); + config3.setBrokerServicePortTls(Optional.of(0)); + config3.setTlsEnabled(true); + config3.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); + config3.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); + config3.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH); + config3.setDefaultNumberOfNamespaceBundles(1); + config3.setAllowAutoTopicCreationType("non-partitioned"); + } + + public void setConfig1DefaultValue(){ + config1.setClusterName("r1"); + config1.setAdvertisedAddress("localhost"); + config1.setWebServicePort(Optional.of(0)); + config1.setWebServicePortTls(Optional.of(0)); + config1.setZookeeperServers("127.0.0.1:" + bkEnsemble1.getZookeeperPort()); + config1.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo"); + config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic()); + config1.setBrokerDeleteInactiveTopicsFrequencySeconds( + inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); + config1.setBrokerServicePort(Optional.of(0)); + config1.setBrokerServicePortTls(Optional.of(0)); + config1.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); + config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); + config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH); + config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); + config1.setDefaultNumberOfNamespaceBundles(1); + config1.setAllowAutoTopicCreationType("non-partitioned"); + } + + public void setConfig2DefaultValue() { + config2.setClusterName("r2"); + config2.setAdvertisedAddress("localhost"); + config2.setWebServicePort(Optional.of(0)); + config2.setWebServicePortTls(Optional.of(0)); + config2.setZookeeperServers("127.0.0.1:" + bkEnsemble2.getZookeeperPort()); + config2.setConfigurationStoreServers("127.0.0.1:" + globalZkS.getZookeeperPort() + "/foo"); + config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic()); + config2.setBrokerDeleteInactiveTopicsFrequencySeconds( + inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); + config2.setBrokerServicePort(Optional.of(0)); + config2.setBrokerServicePortTls(Optional.of(0)); + config2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH); + config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); + config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH); + config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); + config2.setDefaultNumberOfNamespaceBundles(1); + config2.setAllowAutoTopicCreationType("non-partitioned"); + } + + public void resetConfig1() { + config1 = new ServiceConfiguration(); + setConfig1DefaultValue(); + } + + public void resetConfig2() { + config2 = new ServiceConfiguration(); + setConfig2DefaultValue(); + } + + public void resetConfig3() { + config3 = new ServiceConfiguration(); + setConfig3DefaultValue(); + } + private int inSec(int time, TimeUnit unit) { return (int) TimeUnit.SECONDS.convert(time, unit); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java index 397a6a5..fdea264 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.java @@ -106,13 +106,13 @@ public class TopicDuplicationTest extends ProducerConsumerBase { } @Test(timeOut = 30000) - private void testTopicPolicyTakeSnapshot() throws Exception { + public void testTopicPolicyTakeSnapshot() throws Exception { resetConfig(); conf.setSystemTopicEnabled(true); conf.setTopicLevelPoliciesEnabled(true); conf.setBrokerDeduplicationEnabled(true); conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1); - conf.setBrokerDeduplicationSnapshotIntervalSeconds(5); + conf.setBrokerDeduplicationSnapshotIntervalSeconds(7); conf.setBrokerDeduplicationEntriesInterval(20000); super.internalCleanup(); super.internalSetup(); @@ -124,10 +124,10 @@ public class TopicDuplicationTest extends ProducerConsumerBase { Producer<String> producer = pulsarClient .newProducer(Schema.STRING).topic(topicName).enableBatching(false).producerName(producerName).create(); waitCacheInit(topicName); - admin.topics().setDeduplicationSnapshotInterval(topicName, 1); - admin.namespaces().setDeduplicationSnapshotInterval(myNamespace, 2); + admin.topics().setDeduplicationSnapshotInterval(topicName, 3); + admin.namespaces().setDeduplicationSnapshotInterval(myNamespace, 5); - int msgNum = 50; + int msgNum = 10; CountDownLatch countDownLatch = new CountDownLatch(msgNum); for (int i = 0; i < msgNum; i++) { producer.newMessage().value("msg" + i).sendAsync().whenComplete((res, e) -> countDownLatch.countDown()); @@ -139,19 +139,19 @@ public class TopicDuplicationTest extends ProducerConsumerBase { .getManagedLedger().getLastConfirmedEntry(); assertEquals(seqId, msgNum - 1); assertEquals(position.getEntryId(), msgNum - 1); - //The first time, use topic-leve policies, 1 second delay + 1 second interval - Awaitility.await().atMost(2100, TimeUnit.MILLISECONDS) + //The first time, use topic-leve policies, 1 second delay + 3 second interval + Awaitility.await().atMost(5000, TimeUnit.MILLISECONDS) .until(() -> ((PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor() .getMarkDeletedPosition()).getEntryId() == msgNum - 1); ManagedCursor managedCursor = persistentTopic.getMessageDeduplication().getManagedCursor(); PositionImpl markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition(); assertEquals(position, markDeletedPosition); - //remove topic-level policies, namespace-level should be used, interval becomes 2 seconds + //remove topic-level policies, namespace-level should be used, interval becomes 5 seconds admin.topics().removeDeduplicationSnapshotInterval(topicName); producer.newMessage().value("msg").send(); - //zk update time + interval time - Awaitility.await().atMost( 3000, TimeUnit.MILLISECONDS) + //zk update time + 5 second interval time + Awaitility.await().atMost( 7, TimeUnit.SECONDS) .until(() -> ((PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor() .getMarkDeletedPosition()).getEntryId() == msgNum); markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition(); @@ -159,20 +159,20 @@ public class TopicDuplicationTest extends ProducerConsumerBase { assertEquals(msgNum, markDeletedPosition.getEntryId()); assertEquals(position, markDeletedPosition); - //4 remove namespace-level policies, broker-level should be used, interval becomes 2 seconds + //4 remove namespace-level policies, broker-level should be used, interval becomes 3 seconds admin.namespaces().removeDeduplicationSnapshotInterval(myNamespace); - Awaitility.await().atMost(2, TimeUnit.SECONDS) + Awaitility.await().atMost(4, TimeUnit.SECONDS) .until(() -> (admin.namespaces().getDeduplicationSnapshotInterval(myNamespace) == null)); producer.newMessage().value("msg").send(); //ensure that the time exceeds the scheduling interval of ns and topic, but no snapshot is generated Thread.sleep(3000); markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition(); position = (PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry(); - // broker-level interval is 5 seconds, so 3 seconds will not take a snapshot + // broker-level interval is 7 seconds, so 3 seconds will not take a snapshot assertNotEquals(msgNum + 1, markDeletedPosition.getEntryId()); assertNotEquals(position, markDeletedPosition); // wait for scheduler - Awaitility.await().atMost(3, TimeUnit.SECONDS) + Awaitility.await().atMost(5, TimeUnit.SECONDS) .until(() -> ((PositionImpl) persistentTopic.getMessageDeduplication().getManagedCursor() .getMarkDeletedPosition()).getEntryId() == msgNum + 1); markDeletedPosition = (PositionImpl) managedCursor.getMarkDeletedPosition();
