heesung-sn commented on code in PR #19546:
URL: https://github.com/apache/pulsar/pull/19546#discussion_r1114736634
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java:
##########
@@ -821,27 +934,48 @@ private void scheduleCleanup(String broker, long
delayInSecs) {
broker, delayInSecs, cleanupJobs.size());
}
+ private void overrideOwnership(String serviceUnit, ServiceUnitStateData
orphanData, Set<String> availableBrokers) {
+
+ Optional<String> selectedBroker =
brokerSelector.select(availableBrokers, null, getContext());
+ if (selectedBroker.isPresent()) {
+ var override = new ServiceUnitStateData(Owned,
selectedBroker.get(), true);
+ log.info("Overriding ownership serviceUnit:{} from orphanData:{}
to overrideData:{}",
+ serviceUnit, orphanData, override);
+ pubAsync(serviceUnit, override).whenComplete((__, e) -> {
+ if (e != null) {
+ log.error("Failed to override serviceUnit:{} from
orphanData:{} to overrideData:{}",
+ serviceUnit, orphanData, override, e);
+ }
+ });
+ } else {
+ log.error("Failed to override the ownership serviceUnit:{}
orphanData:{}. Empty selected broker.",
+ serviceUnit, orphanData);
+ }
+ }
+
- private void doCleanup(String broker) {
+ private void doCleanup(String broker) throws ExecutionException,
InterruptedException, TimeoutException {
long startTime = System.nanoTime();
log.info("Started ownership cleanup for the inactive broker:{}",
broker);
- int serviceUnitTombstoneCnt = 0;
+ int orphanServiceUnitCleanupCnt = 0;
long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
- for (Map.Entry<String, ServiceUnitStateData> etr :
tableview.entrySet()) {
- ServiceUnitStateData stateData = etr.getValue();
- String serviceUnit = etr.getKey();
- if (StringUtils.equals(broker, stateData.broker())
- || StringUtils.equals(broker, stateData.sourceBroker())) {
- log.info("Cleaning ownership serviceUnit:{}, stateData:{}.",
serviceUnit, stateData);
- tombstoneAsync(serviceUnit).whenComplete((__, e) -> {
- if (e != null) {
- log.error("Failed cleaning the ownership
serviceUnit:{}, stateData:{}, "
- + "cleanupErrorCnt:{}.",
- serviceUnit, stateData,
- totalCleanupErrorCnt.incrementAndGet() -
totalCleanupErrorCntStart);
- }
- });
- serviceUnitTombstoneCnt++;
+ var availableBrokers = new
HashSet(brokerRegistry.getAvailableBrokersAsync()
Review Comment:
Updated.
##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/ServiceUnitStateCompactionTest.java:
##########
@@ -473,52 +473,172 @@ public void testSeekEarliestAfterCompaction() throws
Exception {
.readCompacted(true).subscribe()) {
consumer.seek(MessageId.earliest);
Message<ServiceUnitStateData> m = consumer.receive();
- Assert.assertEquals(m.getKey(), "key0");
- Assert.assertEquals(m.getValue().broker(), "content2");
+ Assert.assertEquals(m.getKey(), key);
+ Assert.assertEquals(m.getValue(), testValues.get(2));
}
try (Consumer<ServiceUnitStateData> consumer =
pulsarClient.newConsumer(schema).topic(topic).subscriptionName("sub1")
.readCompacted(false).subscribe()) {
consumer.seek(MessageId.earliest);
Message<ServiceUnitStateData> m = consumer.receive();
- Assert.assertEquals(m.getKey(), "key0");
- Assert.assertEquals(m.getValue().broker(), "content0");
+ Assert.assertEquals(m.getKey(), key);
+ Assert.assertEquals(m.getValue(), testValues.get(0));
m = consumer.receive();
- Assert.assertEquals(m.getKey(), "key0");
- Assert.assertEquals(m.getValue().broker(), "content1");
+ Assert.assertEquals(m.getKey(), key);
+ Assert.assertEquals(m.getValue(), testValues.get(1));
m = consumer.receive();
- Assert.assertEquals(m.getKey(), "key0");
- Assert.assertEquals(m.getValue().broker(), "content2");
+ Assert.assertEquals(m.getKey(), key);
+ Assert.assertEquals(m.getValue(), testValues.get(2));
}
}
@Test
- public void testBrokerRestartAfterCompaction() throws Exception {
+ public void testSlowTableviewAfterCompaction() throws Exception {
String topic = "persistent://my-property/use/my-ns/my-topic1";
+ String strategyClassName = "topicCompactionStrategyClassName";
+ strategy.checkBrokers(true);
+
+ pulsarClient.newConsumer(schema)
+ .topic(topic)
+ .subscriptionName("sub1")
+ .readCompacted(true)
+ .subscribe().close();
+
+ var fastTV = pulsar.getClient().newTableViewBuilder(schema)
+ .topic(topic)
+ .subscriptionName("fastTV")
+ .loadConf(Map.of(
+ strategyClassName,
+ ServiceUnitStateCompactionStrategy.class.getName()))
+ .create();
+
+ var defaultConf = getDefaultConf();
+ var additionalPulsarTestContext =
createAdditionalPulsarTestContext(defaultConf);
+ var pulsar2 = additionalPulsarTestContext.getPulsarService();
+
+ var slowTV = pulsar2.getClient().newTableViewBuilder(schema)
+ .topic(topic)
+ .subscriptionName("slowTV")
+ .loadConf(Map.of(
+ strategyClassName,
+ ServiceUnitStateCompactionStrategy.class.getName()))
+ .create();
+
+ var semaphore = new Semaphore(0);
+ AtomicBoolean handledReleased = new AtomicBoolean(false);
+
+ slowTV.listen((k, v) -> {
+ if (v.state() == Assigning) {
+ try {
+ // Stuck at handling Assigned
+ handledReleased.set(false);
+ semaphore.acquire();
+ //Thread.sleep(5000);
Review Comment:
Updated.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitState.java:
##########
@@ -24,60 +24,47 @@
/**
* Defines the possible states for service units.
*
- * The following diagram defines the valid state changes
- *
- * ┌───────────┐
- * ┌──────────┤ released │◄────────┐
- * │own └───────────┘ │release
- * │ │
- * │ │
- * ▼ │
- * ┌────────┐ assign(transfer) ┌─────┴────┐
- * │ ├───────────────────►│ │
- * │ owned │ │ assigned │
- * │ │◄───────────────────┤ │
- * └──┬─────┤ own └──────────┘
- * │ ▲ │ ▲
- * │ │ │ │
- * │ │ └──────────────┐ │
- * │ │ │ │
- * │ │ unload │ │ assign(assignment)
- * split │ │ │ │
- * │ │ │ │
- * │ │ create(child) │ │
- * │ │ │ │
- * ▼ │ │ │
- * ┌─────┴─────┐ └─────►┌───┴──────┐
- * │ │ │ │
- * │ splitting ├────────────────► │ free │
- * │ │ discard(parent)│ │
- * └───────────┘ └──────────┘
+ * Refer to Service Unit State Channel in
https://github.com/apache/pulsar/issues/16691 for additional details.
Review Comment:
Updated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]