sijie closed pull request #2450: Added Reader.HasNext in Go client URL: https://github.com/apache/incubator-pulsar/pull/2450
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-go/pulsar/c_reader.go b/pulsar-client-go/pulsar/c_reader.go index 730f9b86fe..12c11034aa 100644 --- a/pulsar-client-go/pulsar/c_reader.go +++ b/pulsar-client-go/pulsar/c_reader.go @@ -146,6 +146,19 @@ func (r *reader) Next(ctx context.Context) (Message, error) { } } +func (r *reader) HasNext() (bool, error) { + value := C.int(0) + res := C.pulsar_reader_has_message_available(r.ptr, &value) + + if res != C.pulsar_result_Ok { + return false, newError(res, "Failed to check if next message is available") + } else if value == C.int(1) { + return true, nil + } else { + return false, nil + } +} + func (r *reader) Close() error { channel := make(chan error) r.CloseAsync(func(err error) { channel <- err; close(channel) }) diff --git a/pulsar-client-go/pulsar/reader.go b/pulsar-client-go/pulsar/reader.go index f61ebd7410..7015c9ca4d 100644 --- a/pulsar-client-go/pulsar/reader.go +++ b/pulsar-client-go/pulsar/reader.go @@ -67,6 +67,9 @@ type Reader interface { // Read the next message in the topic, blocking until a message is available Next(context.Context) (Message, error) + // Check if there is any message available to read from the current position + HasNext() (bool, error) + // Close the reader and stop the broker to push more messages Close() error } diff --git a/pulsar-client-go/pulsar/reader_test.go b/pulsar-client-go/pulsar/reader_test.go index 11d1b3620c..3b075e1d50 100644 --- a/pulsar-client-go/pulsar/reader_test.go +++ b/pulsar-client-go/pulsar/reader_test.go @@ -20,9 +20,9 @@ package pulsar import ( - "testing" - "fmt" "context" + "fmt" + "testing" ) func TestReaderConnectError(t *testing.T) { @@ -80,12 +80,20 @@ func TestReader(t *testing.T) { t.Fatal(err) } + hasNext, err := reader.HasNext() + assertNil(t, err) + assertEqual(t, hasNext, true) + msg, err := reader.Next(ctx) assertNil(t, err) assertNotNil(t, msg) assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i)) } + + hasNext, err := reader.HasNext() + assertNil(t, err) + assertEqual(t, hasNext, false) } func TestReaderWithInvalidConf(t *testing.T) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services