This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
commit 74862956221c7353df551dbe17defe39860d2c61 Author: Matteo Merli <mme...@apache.org> AuthorDate: Tue Aug 28 06:42:42 2018 -0700 Support compaction options in Go client (#2449) Added the option for ReadCompacted in Go based consumer/reader configuration. --- pulsar-client-go/pulsar/c_consumer.go | 2 + pulsar-client-go/pulsar/c_reader.go | 2 + pulsar-client-go/pulsar/consumer.go | 12 ++- pulsar-client-go/pulsar/consumer_test.go | 141 ++++++++++++++++++++++++++++++- pulsar-client-go/pulsar/producer_test.go | 1 - pulsar-client-go/pulsar/reader.go | 9 ++ pulsar-client-go/pulsar/reader_test.go | 91 ++++++++++++++++++++ 7 files changed, 255 insertions(+), 3 deletions(-) diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go index 093dd9d..7f613b2 100644 --- a/pulsar-client-go/pulsar/c_consumer.go +++ b/pulsar-client-go/pulsar/c_consumer.go @@ -120,6 +120,8 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu C.pulsar_consumer_set_consumer_name(conf, name) } + C.pulsar_consumer_set_read_compacted(conf, cBool(options.ReadCompacted)) + subName := C.CString(options.SubscriptionName) defer C.free(unsafe.Pointer(subName)) diff --git a/pulsar-client-go/pulsar/c_reader.go b/pulsar-client-go/pulsar/c_reader.go index 12c1103..04bb5cf 100644 --- a/pulsar-client-go/pulsar/c_reader.go +++ b/pulsar-client-go/pulsar/c_reader.go @@ -99,6 +99,8 @@ func createReaderAsync(client *client, options ReaderOptions, callback func(Read C.pulsar_reader_configuration_set_subscription_role_prefix(conf, prefix) } + C.pulsar_reader_configuration_set_read_compacted(conf, cBool(options.ReadCompacted)) + if options.Name != "" { name := C.CString(options.Name) defer C.free(unsafe.Pointer(name)) diff --git a/pulsar-client-go/pulsar/consumer.go b/pulsar-client-go/pulsar/consumer.go index ed56d9e..b9f2616 100644 --- a/pulsar-client-go/pulsar/consumer.go +++ b/pulsar-client-go/pulsar/consumer.go @@ -20,8 +20,8 @@ package pulsar import ( - "time" "context" + "time" ) // Pair of a Consumer and Message @@ -92,6 +92,16 @@ type ConsumerOptions struct { // Set the consumer name. Name string + + // If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog + // of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for + // each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that + // point, the messages will be sent as normal. + // + // ReadCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e. + // failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a + // shared subscription, will lead to the subscription call throwing a PulsarClientException. + ReadCompacted bool } // An interface that abstracts behavior of Pulsar's consumer diff --git a/pulsar-client-go/pulsar/consumer_test.go b/pulsar-client-go/pulsar/consumer_test.go index 75a454b..f81ce56 100644 --- a/pulsar-client-go/pulsar/consumer_test.go +++ b/pulsar-client-go/pulsar/consumer_test.go @@ -22,6 +22,9 @@ package pulsar import ( "context" "fmt" + "io/ioutil" + "net/http" + "strings" "testing" "time" ) @@ -99,6 +102,111 @@ func TestConsumer(t *testing.T) { consumer.Unsubscribe() } +func TestConsumerCompaction(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: "pulsar://localhost:6650", + }) + + assertNil(t, err) + defer client.Close() + + topic := fmt.Sprintf("my-compaction-topic-%d", time.Now().Unix()) + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + }) + + assertNil(t, err) + defer producer.Close() + + // Pre-create both subscriptions to retain published messages + consumer1, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub-1", + }) + + assertNil(t, err) + consumer1.Close() + + consumer2, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub-2", + ReadCompacted: true, + }) + + assertNil(t, err) + consumer2.Close() + + ctx := context.Background() + + for i := 0; i < 10; i++ { + if err := producer.Send(ctx, ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + Key: "Same-Key", + }); err != nil { + t.Fatal(err) + } + } + + // Compact topic and wait for operation to complete + url := fmt.Sprintf("http://localhost:8080/admin/v2/persistent/public/default/%s/compaction", topic) + makeHttpPutCall(t, url) + for { + res := makeHttpGetCall(t, url) + if strings.Contains(res, "RUNNING") { + fmt.Println("Compaction still running") + time.Sleep(100 * time.Millisecond) + continue + } else { + assertEqual(t, strings.Contains(res, "SUCCESS"), true) + fmt.Println("Compaction is done") + break + } + } + + // Restart the consumers + + consumer1, err = client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub-1", + }) + + assertNil(t, err) + defer consumer1.Close() + + consumer2, err = client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub-2", + ReadCompacted: true, + }) + + assertNil(t, err) + defer consumer2.Close() + + // Consumer-1 will receive all messages + for i := 0; i < 10; i++ { + msg, err := consumer1.Receive(context.Background()) + assertNil(t, err) + assertNotNil(t, msg) + + assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i)) + } + + // Consumer-2 will only receive the last message + msg, err := consumer2.Receive(context.Background()) + assertNil(t, err) + assertNotNil(t, msg) + assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-9")) + + // No more messages on consumer-2 + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + msg, err = consumer2.Receive(ctx) + assertNil(t, msg) + assertNotNil(t, err) +} + func TestConsumerWithInvalidConf(t *testing.T) { client, err := NewClient(ClientOptions{ URL: "pulsar://localhost:6650", @@ -125,13 +233,44 @@ func TestConsumerWithInvalidConf(t *testing.T) { SubscriptionName: "my-subscription", }) - // Expect error in creating cosnumer + // Expect error in creating consumer assertNil(t, consumer) assertNotNil(t, err) assertEqual(t, err.(*Error).Result(), InvalidConfiguration) } +func makeHttpPutCall(t *testing.T, url string) string { + return makeHttpCall(t, http.MethodPut, url) +} + +func makeHttpGetCall(t *testing.T, url string) string { + return makeHttpCall(t, http.MethodGet, url) +} + +func makeHttpCall(t *testing.T, method string, url string) string { + client := http.Client{} + + req, err := http.NewRequest(method, url, nil) + if err != nil { + t.Fatal(err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + + res, err := client.Do(req) + if err != nil { + t.Fatal(err) + } + + body, err := ioutil.ReadAll(res.Body) + if err != nil { + t.Fatal(err) + } + + return string(body) +} func TestConsumerMultiTopics(t *testing.T) { client, err := NewClient(ClientOptions{ diff --git a/pulsar-client-go/pulsar/producer_test.go b/pulsar-client-go/pulsar/producer_test.go index d7748f7..940be85 100644 --- a/pulsar-client-go/pulsar/producer_test.go +++ b/pulsar-client-go/pulsar/producer_test.go @@ -62,7 +62,6 @@ func TestProducer(t *testing.T) { OperationTimeoutSeconds: 30, ConcurrentLookupRequests: 1000, MessageListenerThreads: 5, - EnableTLS: false, }) assertNil(t, err) diff --git a/pulsar-client-go/pulsar/reader.go b/pulsar-client-go/pulsar/reader.go index 7015c9c..5592630 100644 --- a/pulsar-client-go/pulsar/reader.go +++ b/pulsar-client-go/pulsar/reader.go @@ -57,6 +57,15 @@ type ReaderOptions struct { // Set the subscription role prefix. The default prefix is "reader". SubscriptionRolePrefix string + + // If enabled, the reader will read messages from the compacted topic rather than reading the full message backlog + // of the topic. This means that, if the topic has been compacted, the reader will only see the latest value for + // each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that + // point, the messages will be sent as normal. + // + // ReadCompacted can only be enabled when reading from a persistent topic. Attempting to enable it on non-persistent + // topics will lead to the reader create call throwing a PulsarClientException. + ReadCompacted bool } // A Reader can be used to scan through all the messages currently available in a topic. diff --git a/pulsar-client-go/pulsar/reader_test.go b/pulsar-client-go/pulsar/reader_test.go index 3b075e1..a0a63ae 100644 --- a/pulsar-client-go/pulsar/reader_test.go +++ b/pulsar-client-go/pulsar/reader_test.go @@ -22,7 +22,9 @@ package pulsar import ( "context" "fmt" + "strings" "testing" + "time" ) func TestReaderConnectError(t *testing.T) { @@ -128,3 +130,92 @@ func TestReaderWithInvalidConf(t *testing.T) { assertEqual(t, err.(*Error).Result(), InvalidConfiguration) } + + +func TestReaderCompaction(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: "pulsar://localhost:6650", + }) + + assertNil(t, err) + defer client.Close() + + topic := fmt.Sprintf("my-reader-compaction-topic-%d", time.Now().Unix()) + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + }) + + assertNil(t, err) + defer producer.Close() + + ctx := context.Background() + + for i := 0; i < 10; i++ { + if err := producer.Send(ctx, ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + Key: "Same-Key", + }); err != nil { + t.Fatal(err) + } + } + + // Compact topic and wait for operation to complete + url := fmt.Sprintf("http://localhost:8080/admin/v2/persistent/public/default/%s/compaction", topic) + makeHttpPutCall(t, url) + for { + res := makeHttpGetCall(t, url) + if strings.Contains(res, "RUNNING") { + fmt.Println("Compaction still running") + time.Sleep(100 * time.Millisecond) + continue + } else { + assertEqual(t, strings.Contains(res, "SUCCESS"), true) + fmt.Println("Compaction is done") + break + } + } + + // Restart the consumers + + reader1, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessage, + }) + + assertNil(t, err) + defer reader1.Close() + + reader2, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessage, + ReadCompacted: true, + }) + + assertNil(t, err) + defer reader2.Close() + + // Reader-1 will receive all messages + for i := 0; i < 10; i++ { + msg, err := reader1.Next(context.Background()) + assertNil(t, err) + assertNotNil(t, msg) + + assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i)) + } + + // Reader-2 will only receive the last message + msg, err := reader2.Next(context.Background()) + assertNil(t, err) + assertNotNil(t, msg) + assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-9")) + + // No more messages on consumer-2 + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + msg, err = reader2.Next(ctx) + assertNil(t, msg) + assertNotNil(t, err) +} +