I have a couple of questions about topic compaction.

I have been able to demonstrate it working, using the python API (*).  After producing messages with repeated keys, I did a manual compaction:

$ apache-pulsar-2.4.1/bin/pulsar-admin topics compact avro-topic
Topic compaction requested for persistent://public/default/avro-topic
$ apache-pulsar-2.4.1/bin/pulsar-admin topics compaction-status avro-topic
Compaction was a success
$ apache-pulsar-2.4.1/bin/pulsar-admin topics reset-cursor persistent://public/default/avro-topic -s my-subscription -t 10h

The consumer (created with is_read_compacted=True) then sees only the latest message for each key.  That's great.


However, all the messages in the original topic are still available - I can see them using the reader API.

Question 1: at what point, if ever, is the original topic purged to make space?  If I set an infinite retention policy on the topic, will compaction ever reclaim space?

Question 2: I don't understand what happens if you run compaction on an already-compacted topic.  Is the compacted topic compacted, or is the original topic re-compacted, or something else?

Regards,

Brian.


(*) Code:

=== producer ===

import pulsar

from pulsar.schema import *

class Foo(Record):
    a = String()
    b = Integer()
    c = Boolean()

client = pulsar.Client('pulsar://localhost:6650')

producer = client.create_producer('avro-topic', schema=AvroSchema(Foo))

for i in range(10):
    producer.send(Foo(a="hello%d" % i, b=i, c=True), partition_key=str(i % 3))

client.close()

# Problem 1: the python API doesn't have "key", but it does have "partition_key".  So even though I'm using a non-partitioned topic, I'm assuming that "partition_key" equates to "message key"

=== reader ===

from pulsar.schema import *

class Foo(Record):
    a = String()
    b = Integer()
    c = Boolean()

client = pulsar.Client('pulsar://localhost:6650')

msg_id = pulsar.MessageId.earliest

reader = client.create_reader('avro-topic', msg_id,
         schema=AvroSchema(Foo),
         #is_read_compacted=True,
         )

while True:
    msg = reader.read_next()
    print("Received message %r key=%r id=%s" % (msg.value().a, msg.partition_key(), msg.message_id()))

# Problem 2: the "reader" API doesn't appear to have "is_read_compacted", so it's always reading from the non-compacted version of the topic.

=== consumer ===

import pulsar

from pulsar.schema import *

class Foo(Record):
    a = String()
    b = Integer()
    c = Boolean()

client = pulsar.Client('pulsar://localhost:6650')

consumer = client.subscribe('avro-topic', 'my-subscription', schema=AvroSchema(Foo),
            is_read_compacted=True)

while True:
    msg = consumer.receive()
    try:
        print("Received message %r key=%r id=%s" % (msg.value().a, msg.partition_key(), msg.message_id()))
        consumer.acknowledge(msg)
    except Exception as e:
        # Message failed to be processed
        print("Oops %r" % e)
        consumer.negative_acknowledge(msg)

# Works if the topic is manually compacted and then the cursor is manually reset

======


Reply via email to