Denovo1998 commented on code in PR #25171:
URL: https://github.com/apache/pulsar/pull/25171#discussion_r2727446391


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java:
##########
@@ -1023,6 +1026,47 @@ public void testReplicatedSubscriptionWithCompaction() 
throws Exception {
         Assert.assertEquals(result, List.of("V2"));
     }
 
+    @Test
+    public void testReplicatedSubscriptionWithSpecificSubscriptionName() 
throws Exception {
+        final String namespace = 
BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
+        final String topicName = "persistent://" + namespace + 
"/testReadMarkers";
+        final String subName = "__supervisor-01";
+
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, 
Sets.newHashSet("r1", "r2"));
+
+        @Cleanup
+        PulsarClient client1 = 
PulsarClient.builder().serviceUrl(url1.toString())
+            .statsInterval(0, TimeUnit.SECONDS).build();
+
+
+        Producer<String> producer = 
client1.newProducer(Schema.STRING).topic(topicName).create();
+        producer.newMessage().value("message-1").send();
+        producer.newMessage().value("message-2").send();
+        producer.close();
+
+        // create subscription in r1
+        createReplicatedSubscription(client1, topicName, subName, true);
+
+        Awaitility.await().untilAsserted(() -> {
+            Map<String, Boolean> status = 
admin1.topics().getReplicatedSubscriptionStatus(topicName, subName);
+            assertTrue(status.get(topicName));
+        });
+
+        @Cleanup
+        PulsarClient client2 = 
PulsarClient.builder().serviceUrl(url2.toString())
+            .statsInterval(0, TimeUnit.SECONDS).build();
+
+        var reader = RawReader.create(client2, topicName, subName).get();

Review Comment:
   Need to close, please use try-with-resources.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -326,6 +325,22 @@ public int filterEntriesForConsumer(@Nullable 
MessageMetadata[] metadataArray, i
         return totalEntries;
     }
 
+    private boolean isMarkerAllowedDeliveryToSubscription(ManagedCursor 
cursor) {
+        if (cursor == null || cursor.getName() == null) {
+            return false;
+        }
+        var name = cursor.getName();
+        if (Compactor.COMPACTION_SUBSCRIPTION.equals(name)) {
+            return true;
+        }
+        for (String prefix : 
serviceConfig.getSubscriptionPrefixToSkipServerMarkerCheck()) {
+            if (name.startsWith(prefix)) {

Review Comment:
   If paired with "" (empty string), startsWith("") will allow all 
subscriptions to receive the marker.
   
   If paired with "__", it might cause many internal subscriptions to receive 
the marker.
   
   Here these two need to be ignored?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java:
##########
@@ -1939,6 +1941,27 @@ public void testReadCommittedWithReadCompacted() throws 
Exception{
         Assert.assertEquals(messages, List.of("V2", "V3"));
     }
 
+    @Test
+    public void testRawReaderWithTransactionMarker() throws Exception {
+        final String namespace = "tnx/ns-read-marker";
+        final String topic = "persistent://" + namespace + 
"/test_transaction_topic" + UUID.randomUUID();
+        admin.namespaces().createNamespace(namespace);
+
+        @Cleanup
+        Producer<String> producer = 
this.pulsarClient.newProducer(Schema.STRING)
+            .topic(topic)
+            .create();
+
+        Transaction txn = pulsarClient.newTransaction().build().get();
+        producer.newMessage(txn).value("message-1").send();
+        txn.commit().get();
+
+        var reader = RawReader.create(pulsarClient, topic, 
"__supervisor-01").get();

Review Comment:
   Need to close, please use try-with-resources.



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -4019,6 +4019,15 @@ public double 
getLoadBalancerBandwidthOutResourceWeight() {
     )
     private Set<String> additionalServlets = new TreeSet<>();
 
+    @FieldContext(
+        category = CATEGORY_SERVER,
+        doc = "For some use case like compaction, raw reader want to read all 
the data from the topics and handle the "
+              + "marker by the reader. It needs to skip the marker check to 
delivery the message to the consumer. "
+              + "This configuration allows to configure a subscription prefix, 
the the reader which has the prefix "
+              + "will receive all the data."

Review Comment:
   typo: "the the reader…"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to