This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 9213e58 Attempt to fix flakyness of BrokerBkEnsemblesTests.testSkipCorruptDataLedger (#2318) 9213e58 is described below commit 9213e58a74dc26096cc132eec709ae3d95ade095 Author: Matteo Merli <mme...@apache.org> AuthorDate: Tue Aug 7 09:11:28 2018 +0900 Attempt to fix flakyness of BrokerBkEnsemblesTests.testSkipCorruptDataLedger (#2318) --- .../broker/service/BrokerBkEnsemblesTests.java | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index ce023c3..94b2226 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -19,6 +19,9 @@ package org.apache.pulsar.broker.service; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; +import static org.testng.Assert.assertEquals; + +import com.google.common.collect.Sets; import java.lang.reflect.Field; import java.net.URL; @@ -54,8 +57,6 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import com.google.common.collect.Sets; - /** */ public class BrokerBkEnsemblesTests { @@ -236,8 +237,11 @@ public class BrokerBkEnsemblesTests { * * @throws Exception */ - @Test(timeOut = 6000) + @Test public void testSkipCorruptDataLedger() throws Exception { + // Ensure intended state for autoSkipNonRecoverableData + admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "false"); + PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS) .build(); @@ -246,9 +250,13 @@ public class BrokerBkEnsemblesTests { final int totalDataLedgers = 5; final int entriesPerLedger = totalMessages / totalDataLedgers; - admin.namespaces().createNamespace(ns1); + try { + admin.namespaces().createNamespace(ns1); + } catch (Exception e) { - final String topic1 = "persistent://" + ns1 + "/my-topic"; + } + + final String topic1 = "persistent://" + ns1 + "/my-topic-" + System.currentTimeMillis(); // Create subscription Consumer<byte[]> consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name") @@ -287,6 +295,7 @@ public class BrokerBkEnsemblesTests { // (2) delete first 4 data-ledgers ledgerInfo.entrySet().forEach(entry -> { if (!entry.equals(lastLedger)) { + assertEquals(entry.getValue().getEntries(), entriesPerLedger); try { bookKeeper.deleteLedger(entry.getKey()); } catch (Exception e) { @@ -322,7 +331,7 @@ public class BrokerBkEnsemblesTests { // (5) consumer will be able to consume 20 messages from last non-deleted ledger consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name").subscribe(); for (int i = 0; i < entriesPerLedger; i++) { - msg = consumer.receive(5, TimeUnit.SECONDS); + msg = consumer.receive(); System.out.println(i); consumer.acknowledge(msg); } @@ -330,7 +339,6 @@ public class BrokerBkEnsemblesTests { producer.close(); consumer.close(); client.close(); - } private static final Logger LOG = LoggerFactory.getLogger(BrokerBkEnsemblesTests.class);