This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 9fe2f41 Fix TableViewImpl not retrying partition update on exceptions (#14408) 9fe2f41 is described below commit 9fe2f418200f4231326fc273e5671c5536b9bf65 Author: Ziyao Wei <ziyao.wei....@gmail.com> AuthorDate: Wed Feb 23 14:03:42 2022 -0500 Fix TableViewImpl not retrying partition update on exceptions (#14408) * Fix TableViewImpl not retrying partition update on exceptions * Log exception and undo whitespace changes * Change topic name in unit test --- .../apache/pulsar/client/impl/TableViewTest.java | 57 ++++++++++++++++++++++ .../apache/pulsar/client/impl/TableViewImpl.java | 3 +- 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java index 9d235ee..4f0b704 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java @@ -30,12 +30,16 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.TableView; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -158,4 +162,57 @@ public class TableViewTest extends MockedPulsarServiceBaseTest { }); Assert.assertEquals(tv.keySet(), keys2); } + + + @Test(timeOut = 30 * 1000) + // Regression test for making sure partition changes are always periodically checked even after a check returned + // exceptionally. + public void testTableViewUpdatePartitionsTriggeredDespiteExceptions() throws Exception { + String topic = "persistent://public/default/tableview-test-update-partitions-triggered-despite-exceptions"; + admin.topics().createPartitionedTopic(topic, 3); + int count = 20; + Set<String> keys = this.publishMessages(topic, count, false); + PulsarClient spyPulsarClient = Mockito.spy(pulsarClient); + @Cleanup + TableView<byte[]> tv = spyPulsarClient.newTableViewBuilder(Schema.BYTES) + .topic(topic) + .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS) + .create(); + log.info("start tv size: {}", tv.size()); + tv.forEachAndListen((k, v) -> log.info("{} -> {}", k, new String(v))); + Awaitility.await().untilAsserted(() -> { + log.info("Current tv size: {}", tv.size()); + Assert.assertEquals(tv.size(), count); + }); + Assert.assertEquals(tv.keySet(), keys); + tv.forEachAndListen((k, v) -> log.info("checkpoint {} -> {}", k, new String(v))); + + // Let update partition check throw an exception + Mockito.doReturn(FutureUtil.failedFuture(new PulsarClientException(""))) + .when(spyPulsarClient) + .getPartitionsForTopic(Mockito.any()); + + admin.topics().updatePartitionedTopic(topic, 4); + TopicName topicName = TopicName.get(topic); + + // Make sure the get partitions callback is called; it should throw an exception + Mockito.verify(spyPulsarClient).getPartitionsForTopic(Mockito.any()); + + // Send more data to partition 3, which is not in the current TableView, need update partitions + Set<String> keys2 = + this.publishMessages(topicName.getPartition(3).toString(), count * 2, false); + + // Wait for 10 seconds; verify that the messages haven't arrived, which would have happened if the partitions + // has been updated + TimeUnit.SECONDS.sleep(10); + Assert.assertEquals(tv.size(), count); + + // Let update partition check succeed, and check the messages eventually arrives + Mockito.doCallRealMethod().when(spyPulsarClient).getPartitionsForTopic(Mockito.any()); + Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> { + log.info("Current tv size: {}", tv.size()); + Assert.assertEquals(tv.size(), count * 2); + }); + Assert.assertEquals(tv.keySet(), keys2); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java index ab9bf11..e767156 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java @@ -105,7 +105,8 @@ public class TableViewImpl<T> implements TableView<T> { start().whenComplete((tw, ex) -> { if (ex != null) { - log.warn("Failed to check for changes in number of partitions"); + log.warn("Failed to check for changes in number of partitions: {}", ex); + schedulePartitionsCheck(); } }); }