This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git

commit 74862956221c7353df551dbe17defe39860d2c61
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Tue Aug 28 06:42:42 2018 -0700

    Support compaction options in Go client (#2449)
    
    Added the option for ReadCompacted in Go based consumer/reader 
configuration.
---
 pulsar-client-go/pulsar/c_consumer.go    |   2 +
 pulsar-client-go/pulsar/c_reader.go      |   2 +
 pulsar-client-go/pulsar/consumer.go      |  12 ++-
 pulsar-client-go/pulsar/consumer_test.go | 141 ++++++++++++++++++++++++++++++-
 pulsar-client-go/pulsar/producer_test.go |   1 -
 pulsar-client-go/pulsar/reader.go        |   9 ++
 pulsar-client-go/pulsar/reader_test.go   |  91 ++++++++++++++++++++
 7 files changed, 255 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-go/pulsar/c_consumer.go 
b/pulsar-client-go/pulsar/c_consumer.go
index 093dd9d..7f613b2 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -120,6 +120,8 @@ func subscribeAsync(client *client, options 
ConsumerOptions, callback func(Consu
                C.pulsar_consumer_set_consumer_name(conf, name)
        }
 
+       C.pulsar_consumer_set_read_compacted(conf, cBool(options.ReadCompacted))
+
        subName := C.CString(options.SubscriptionName)
        defer C.free(unsafe.Pointer(subName))
 
diff --git a/pulsar-client-go/pulsar/c_reader.go 
b/pulsar-client-go/pulsar/c_reader.go
index 12c1103..04bb5cf 100644
--- a/pulsar-client-go/pulsar/c_reader.go
+++ b/pulsar-client-go/pulsar/c_reader.go
@@ -99,6 +99,8 @@ func createReaderAsync(client *client, options ReaderOptions, 
callback func(Read
                
C.pulsar_reader_configuration_set_subscription_role_prefix(conf, prefix)
        }
 
+       C.pulsar_reader_configuration_set_read_compacted(conf, 
cBool(options.ReadCompacted))
+
        if options.Name != "" {
                name := C.CString(options.Name)
                defer C.free(unsafe.Pointer(name))
diff --git a/pulsar-client-go/pulsar/consumer.go 
b/pulsar-client-go/pulsar/consumer.go
index ed56d9e..b9f2616 100644
--- a/pulsar-client-go/pulsar/consumer.go
+++ b/pulsar-client-go/pulsar/consumer.go
@@ -20,8 +20,8 @@
 package pulsar
 
 import (
-       "time"
        "context"
+       "time"
 )
 
 // Pair of a Consumer and Message
@@ -92,6 +92,16 @@ type ConsumerOptions struct {
 
        // Set the consumer name.
        Name string
+
+       // If enabled, the consumer will read messages from the compacted topic 
rather than reading the full message backlog
+       // of the topic. This means that, if the topic has been compacted, the 
consumer will only see the latest value for
+       // each key in the topic, up until the point in the topic message 
backlog that has been compacted. Beyond that
+       // point, the messages will be sent as normal.
+       //
+       // ReadCompacted can only be enabled subscriptions to persistent 
topics, which have a single active consumer (i.e.
+       //  failure or exclusive subscriptions). Attempting to enable it on 
subscriptions to a non-persistent topics or on a
+       //  shared subscription, will lead to the subscription call throwing a 
PulsarClientException.
+       ReadCompacted bool
 }
 
 // An interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar-client-go/pulsar/consumer_test.go 
b/pulsar-client-go/pulsar/consumer_test.go
index 75a454b..f81ce56 100644
--- a/pulsar-client-go/pulsar/consumer_test.go
+++ b/pulsar-client-go/pulsar/consumer_test.go
@@ -22,6 +22,9 @@ package pulsar
 import (
        "context"
        "fmt"
+       "io/ioutil"
+       "net/http"
+       "strings"
        "testing"
        "time"
 )
@@ -99,6 +102,111 @@ func TestConsumer(t *testing.T) {
        consumer.Unsubscribe()
 }
 
+func TestConsumerCompaction(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: "pulsar://localhost:6650",
+       })
+
+       assertNil(t, err)
+       defer client.Close()
+
+       topic := fmt.Sprintf("my-compaction-topic-%d", time.Now().Unix())
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+       })
+
+       assertNil(t, err)
+       defer producer.Close()
+
+       // Pre-create both subscriptions to retain published messages
+       consumer1, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "my-sub-1",
+       })
+
+       assertNil(t, err)
+       consumer1.Close()
+
+       consumer2, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "my-sub-2",
+               ReadCompacted:    true,
+       })
+
+       assertNil(t, err)
+       consumer2.Close()
+
+       ctx := context.Background()
+
+       for i := 0; i < 10; i++ {
+               if err := producer.Send(ctx, ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+                       Key:     "Same-Key",
+               }); err != nil {
+                       t.Fatal(err)
+               }
+       }
+
+       // Compact topic and wait for operation to complete
+       url := 
fmt.Sprintf("http://localhost:8080/admin/v2/persistent/public/default/%s/compaction";,
 topic)
+       makeHttpPutCall(t, url)
+       for {
+               res := makeHttpGetCall(t, url)
+               if strings.Contains(res, "RUNNING") {
+                       fmt.Println("Compaction still running")
+                       time.Sleep(100 * time.Millisecond)
+                       continue
+               } else {
+                       assertEqual(t, strings.Contains(res, "SUCCESS"), true)
+                       fmt.Println("Compaction is done")
+                       break
+               }
+       }
+
+       // Restart the consumers
+
+       consumer1, err = client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "my-sub-1",
+       })
+
+       assertNil(t, err)
+       defer consumer1.Close()
+
+       consumer2, err = client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "my-sub-2",
+               ReadCompacted:    true,
+       })
+
+       assertNil(t, err)
+       defer consumer2.Close()
+
+       // Consumer-1 will receive all messages
+       for i := 0; i < 10; i++ {
+               msg, err := consumer1.Receive(context.Background())
+               assertNil(t, err)
+               assertNotNil(t, msg)
+
+               assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", 
i))
+       }
+
+       // Consumer-2 will only receive the last message
+       msg, err := consumer2.Receive(context.Background())
+       assertNil(t, err)
+       assertNotNil(t, msg)
+       assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-9"))
+
+       // No more messages on consumer-2
+       ctx, cancel := context.WithTimeout(context.Background(), 
100*time.Millisecond)
+       defer cancel()
+
+       msg, err = consumer2.Receive(ctx)
+       assertNil(t, msg)
+       assertNotNil(t, err)
+}
+
 func TestConsumerWithInvalidConf(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: "pulsar://localhost:6650",
@@ -125,13 +233,44 @@ func TestConsumerWithInvalidConf(t *testing.T) {
                SubscriptionName: "my-subscription",
        })
 
-       // Expect error in creating cosnumer
+       // Expect error in creating consumer
        assertNil(t, consumer)
        assertNotNil(t, err)
 
        assertEqual(t, err.(*Error).Result(), InvalidConfiguration)
 }
 
+func makeHttpPutCall(t *testing.T, url string) string {
+       return makeHttpCall(t, http.MethodPut, url)
+}
+
+func makeHttpGetCall(t *testing.T, url string) string {
+       return makeHttpCall(t, http.MethodGet, url)
+}
+
+func makeHttpCall(t *testing.T, method string, url string) string {
+       client := http.Client{}
+
+       req, err := http.NewRequest(method, url, nil)
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       req.Header.Set("Content-Type", "application/json")
+       req.Header.Set("Accept", "application/json")
+
+       res, err := client.Do(req)
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       body, err := ioutil.ReadAll(res.Body)
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       return string(body)
+}
 
 func TestConsumerMultiTopics(t *testing.T) {
        client, err := NewClient(ClientOptions{
diff --git a/pulsar-client-go/pulsar/producer_test.go 
b/pulsar-client-go/pulsar/producer_test.go
index d7748f7..940be85 100644
--- a/pulsar-client-go/pulsar/producer_test.go
+++ b/pulsar-client-go/pulsar/producer_test.go
@@ -62,7 +62,6 @@ func TestProducer(t *testing.T) {
                OperationTimeoutSeconds:  30,
                ConcurrentLookupRequests: 1000,
                MessageListenerThreads:   5,
-               EnableTLS:                false,
        })
 
        assertNil(t, err)
diff --git a/pulsar-client-go/pulsar/reader.go 
b/pulsar-client-go/pulsar/reader.go
index 7015c9c..5592630 100644
--- a/pulsar-client-go/pulsar/reader.go
+++ b/pulsar-client-go/pulsar/reader.go
@@ -57,6 +57,15 @@ type ReaderOptions struct {
 
        // Set the subscription role prefix. The default prefix is "reader".
        SubscriptionRolePrefix string
+
+       // If enabled, the reader will read messages from the compacted topic 
rather than reading the full message backlog
+       // of the topic. This means that, if the topic has been compacted, the 
reader will only see the latest value for
+       // each key in the topic, up until the point in the topic message 
backlog that has been compacted. Beyond that
+       // point, the messages will be sent as normal.
+       //
+       // ReadCompacted can only be enabled when reading from a persistent 
topic. Attempting to enable it on non-persistent
+       // topics will lead to the reader create call throwing a 
PulsarClientException.
+       ReadCompacted bool
 }
 
 // A Reader can be used to scan through all the messages currently available 
in a topic.
diff --git a/pulsar-client-go/pulsar/reader_test.go 
b/pulsar-client-go/pulsar/reader_test.go
index 3b075e1..a0a63ae 100644
--- a/pulsar-client-go/pulsar/reader_test.go
+++ b/pulsar-client-go/pulsar/reader_test.go
@@ -22,7 +22,9 @@ package pulsar
 import (
        "context"
        "fmt"
+       "strings"
        "testing"
+       "time"
 )
 
 func TestReaderConnectError(t *testing.T) {
@@ -128,3 +130,92 @@ func TestReaderWithInvalidConf(t *testing.T) {
 
        assertEqual(t, err.(*Error).Result(), InvalidConfiguration)
 }
+
+
+func TestReaderCompaction(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: "pulsar://localhost:6650",
+       })
+
+       assertNil(t, err)
+       defer client.Close()
+
+       topic := fmt.Sprintf("my-reader-compaction-topic-%d", time.Now().Unix())
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+       })
+
+       assertNil(t, err)
+       defer producer.Close()
+
+       ctx := context.Background()
+
+       for i := 0; i < 10; i++ {
+               if err := producer.Send(ctx, ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+                       Key:     "Same-Key",
+               }); err != nil {
+                       t.Fatal(err)
+               }
+       }
+
+       // Compact topic and wait for operation to complete
+       url := 
fmt.Sprintf("http://localhost:8080/admin/v2/persistent/public/default/%s/compaction";,
 topic)
+       makeHttpPutCall(t, url)
+       for {
+               res := makeHttpGetCall(t, url)
+               if strings.Contains(res, "RUNNING") {
+                       fmt.Println("Compaction still running")
+                       time.Sleep(100 * time.Millisecond)
+                       continue
+               } else {
+                       assertEqual(t, strings.Contains(res, "SUCCESS"), true)
+                       fmt.Println("Compaction is done")
+                       break
+               }
+       }
+
+       // Restart the consumers
+
+       reader1, err := client.CreateReader(ReaderOptions{
+               Topic:          topic,
+               StartMessageID: EarliestMessage,
+       })
+
+       assertNil(t, err)
+       defer reader1.Close()
+
+       reader2, err := client.CreateReader(ReaderOptions{
+               Topic:          topic,
+               StartMessageID: EarliestMessage,
+               ReadCompacted:  true,
+       })
+
+       assertNil(t, err)
+       defer reader2.Close()
+
+       // Reader-1 will receive all messages
+       for i := 0; i < 10; i++ {
+               msg, err := reader1.Next(context.Background())
+               assertNil(t, err)
+               assertNotNil(t, msg)
+
+               assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", 
i))
+       }
+
+       // Reader-2 will only receive the last message
+       msg, err := reader2.Next(context.Background())
+       assertNil(t, err)
+       assertNotNil(t, msg)
+       assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-9"))
+
+       // No more messages on consumer-2
+       ctx, cancel := context.WithTimeout(context.Background(), 
100*time.Millisecond)
+       defer cancel()
+
+       msg, err = reader2.Next(ctx)
+       assertNil(t, msg)
+       assertNotNil(t, err)
+}
+

Reply via email to