I have a couple of questions about topic compaction.
I have been able to demonstrate it working, using the python API (*).
After producing messages with repeated keys, I did a manual compaction:
$ apache-pulsar-2.4.1/bin/pulsar-admin topics compact avro-topic
Topic compaction requested for persistent://public/default/avro-topic
$ apache-pulsar-2.4.1/bin/pulsar-admin topics compaction-status avro-topic
Compaction was a success
$ apache-pulsar-2.4.1/bin/pulsar-admin topics reset-cursor
persistent://public/default/avro-topic -s my-subscription -t 10h
The consumer (created with is_read_compacted=True) then sees only the
latest message for each key. That's great.
However, all the messages in the original topic are still available - I
can see them using the reader API.
Question 1: at what point, if ever, is the original topic purged to make
space? If I set an infinite retention policy on the topic, will
compaction ever reclaim space?
Question 2: I don't understand what happens if you run compaction on an
already-compacted topic. Is the compacted topic compacted, or is the
original topic re-compacted, or something else?
Regards,
Brian.
(*) Code:
=== producer ===
import pulsar
from pulsar.schema import *
class Foo(Record):
a = String()
b = Integer()
c = Boolean()
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('avro-topic', schema=AvroSchema(Foo))
for i in range(10):
producer.send(Foo(a="hello%d" % i, b=i, c=True),
partition_key=str(i % 3))
client.close()
# Problem 1: the python API doesn't have "key", but it does have
"partition_key". So even though I'm using a non-partitioned topic, I'm
assuming that "partition_key" equates to "message key"
=== reader ===
from pulsar.schema import *
class Foo(Record):
a = String()
b = Integer()
c = Boolean()
client = pulsar.Client('pulsar://localhost:6650')
msg_id = pulsar.MessageId.earliest
reader = client.create_reader('avro-topic', msg_id,
schema=AvroSchema(Foo),
#is_read_compacted=True,
)
while True:
msg = reader.read_next()
print("Received message %r key=%r id=%s" % (msg.value().a,
msg.partition_key(), msg.message_id()))
# Problem 2: the "reader" API doesn't appear to have
"is_read_compacted", so it's always reading from the non-compacted
version of the topic.
=== consumer ===
import pulsar
from pulsar.schema import *
class Foo(Record):
a = String()
b = Integer()
c = Boolean()
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('avro-topic', 'my-subscription',
schema=AvroSchema(Foo),
is_read_compacted=True)
while True:
msg = consumer.receive()
try:
print("Received message %r key=%r id=%s" % (msg.value().a,
msg.partition_key(), msg.message_id()))
consumer.acknowledge(msg)
except Exception as e:
# Message failed to be processed
print("Oops %r" % e)
consumer.negative_acknowledge(msg)
# Works if the topic is manually compacted and then the cursor is
manually reset
======