This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 220dcb4  Export client metrics to Prometheus (#317)
220dcb4 is described below

commit 220dcb40d531c16e59d323d420ee8f00492001dd
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Fri Jul 10 12:11:30 2020 -0700

    Export client metrics to Prometheus (#317)
    
    * Export client metrics to Prometheus
    
    * Added missing messagesReceived.Inc()
---
 go.mod                            |  4 +-
 go.sum                            | 84 +++++++++++++++++++++++++++++++++++++--
 perf/pulsar-perf-go.go            | 12 ++++++
 pulsar/consumer_impl.go           | 29 +++++++++++++-
 pulsar/consumer_multitopic.go     |  2 +
 pulsar/consumer_partition.go      | 60 ++++++++++++++++++++++++++++
 pulsar/consumer_regex.go          |  2 +
 pulsar/impl_message.go            |  6 ++-
 pulsar/internal/connection.go     | 30 ++++++++++++++
 pulsar/internal/lookup_service.go | 11 +++++
 pulsar/internal/rpc_client.go     | 13 ++++++
 pulsar/producer_impl.go           | 24 +++++++++++
 pulsar/producer_partition.go      | 55 ++++++++++++++++++++++++-
 pulsar/reader_impl.go             | 17 ++++++++
 14 files changed, 340 insertions(+), 9 deletions(-)

diff --git a/go.mod b/go.mod
index f26abd0..afae8cc 100644
--- a/go.mod
+++ b/go.mod
@@ -9,9 +9,11 @@ require (
        github.com/gogo/protobuf v1.3.1
        github.com/inconshreveable/mousetrap v1.0.0 // indirect
        github.com/klauspost/compress v1.10.8
+       github.com/kr/pretty v0.2.0 // indirect
        github.com/pierrec/lz4 v2.0.5+incompatible
        github.com/pkg/errors v0.8.1
-       github.com/sirupsen/logrus v1.4.1
+       github.com/prometheus/client_golang v1.7.1
+       github.com/sirupsen/logrus v1.4.2
        github.com/spaolacci/murmur3 v1.1.0
        github.com/spf13/cobra v0.0.3
        github.com/spf13/pflag v1.0.3 // indirect
diff --git a/go.sum b/go.sum
index 302cf5b..9f51dbb 100644
--- a/go.sum
+++ b/go.sum
@@ -1,46 +1,106 @@
 github.com/BurntSushi/toml v0.3.1/go.mod 
h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
 github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32 
h1:/gZKpgSMydtrih81nvUhlkXpZIUfthKShSCVbRzBt9Y=
 github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32/go.mod 
h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
+github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod 
h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
+github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod 
h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
+github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod 
h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
+github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod 
h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
 github.com/ardielle/ardielle-go v1.5.2 
h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4=
 github.com/ardielle/ardielle-go v1.5.2/go.mod 
h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI=
 github.com/ardielle/ardielle-tools v1.5.4/go.mod 
h1:oZN+JRMnqGiIhrzkRN9l26Cej9dEx4jeNG6A+AdkShk=
 github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6 
h1:KXlsf+qt/X5ttPGEjR0tPH1xaWWoKBEg9Q1THAj2h3I=
 github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod 
h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA=
+github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod 
h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
+github.com/beorn7/perks v1.0.0/go.mod 
h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
+github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
+github.com/beorn7/perks v1.0.1/go.mod 
h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
 github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b 
h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE=
 github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod 
h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
 github.com/boynton/repl v0.0.0-20170116235056-348863958e3e/go.mod 
h1:Crc/GCZ3NXDVCio7Yr0o+SSrytpcFhLmVCIzi0s49t4=
+github.com/cespare/xxhash/v2 v2.1.1 
h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
+github.com/cespare/xxhash/v2 v2.1.1/go.mod 
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/dimfeld/httptreemux v5.0.1+incompatible 
h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
 github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod 
h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
+github.com/go-kit/kit v0.8.0/go.mod 
h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
+github.com/go-kit/kit v0.9.0/go.mod 
h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
+github.com/go-logfmt/logfmt v0.3.0/go.mod 
h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
+github.com/go-logfmt/logfmt v0.4.0/go.mod 
h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
+github.com/go-stack/stack v1.8.0/go.mod 
h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/gogo/protobuf v1.1.1/go.mod 
h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
 github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
 github.com/gogo/protobuf v1.3.1/go.mod 
h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
+github.com/golang/protobuf v1.2.0/go.mod 
h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.1/go.mod 
h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.3.2/go.mod 
h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.4.0-rc.1/go.mod 
h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
+github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod 
h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
+github.com/golang/protobuf v1.4.0-rc.2/go.mod 
h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
+github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod 
h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
+github.com/golang/protobuf v1.4.0/go.mod 
h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
+github.com/golang/protobuf v1.4.2 
h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
+github.com/golang/protobuf v1.4.2/go.mod 
h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
 github.com/google/go-cmp v0.3.0/go.mod 
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/google/go-cmp v0.3.1/go.mod 
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
 github.com/google/go-cmp v0.4.0/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/gofuzz v1.0.0/go.mod 
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
 github.com/gorilla/context v1.1.1/go.mod 
h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
 github.com/gorilla/mux v1.7.3/go.mod 
h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
 github.com/inconshreveable/mousetrap v1.0.0 
h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
 github.com/inconshreveable/mousetrap v1.0.0/go.mod 
h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
 github.com/jawher/mow.cli v1.0.4/go.mod 
h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk=
 github.com/jawher/mow.cli v1.1.0/go.mod 
h1:aNaQlc7ozF3vw6IJ2dHjp2ZFiA4ozMIYY6PyuRJwlUg=
+github.com/json-iterator/go v1.1.6/go.mod 
h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
+github.com/json-iterator/go v1.1.10/go.mod 
h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
+github.com/julienschmidt/httprouter v1.2.0/go.mod 
h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
 github.com/kisielk/errcheck v1.2.0/go.mod 
h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
 github.com/kisielk/gotool v1.0.0/go.mod 
h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
-github.com/klauspost/compress v1.10.5 
h1:7q6vHIqubShURwQz8cQK6yIe/xC3IF0Vm7TGfqjewrc=
-github.com/klauspost/compress v1.10.5/go.mod 
h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
 github.com/klauspost/compress v1.10.8 
h1:eLeJ3dr/Y9+XRfJT4l+8ZjmtB5RPJhucH2HeCV5+IZY=
 github.com/klauspost/compress v1.10.8/go.mod 
h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1 
h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod 
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
+github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod 
h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
+github.com/kr/pretty v0.1.0/go.mod 
h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
+github.com/kr/pretty v0.2.0/go.mod 
h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
+github.com/kr/text v0.1.0/go.mod 
h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/matttproud/golang_protobuf_extensions v1.0.1 
h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
+github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod 
h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
+github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod 
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod 
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
+github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod 
h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/modern-go/reflect2 v1.0.1/go.mod 
h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod 
h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
 github.com/pierrec/lz4 v2.0.5+incompatible 
h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
 github.com/pierrec/lz4 v2.0.5+incompatible/go.mod 
h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
+github.com/pkg/errors v0.8.0/go.mod 
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
 github.com/pkg/errors v0.8.1/go.mod 
h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pmezard/go-difflib v1.0.0 
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/sirupsen/logrus v1.4.1 
h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
-github.com/sirupsen/logrus v1.4.1/go.mod 
h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
+github.com/prometheus/client_golang v0.9.1/go.mod 
h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
+github.com/prometheus/client_golang v1.0.0/go.mod 
h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
+github.com/prometheus/client_golang v1.7.1 
h1:NTGy1Ja9pByO+xAeH/qiWnLrKtr3hJPNjaVUwnjpdpA=
+github.com/prometheus/client_golang v1.7.1/go.mod 
h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
+github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod 
h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
+github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod 
h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/prometheus/client_model v0.2.0 
h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M=
+github.com/prometheus/client_model v0.2.0/go.mod 
h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
+github.com/prometheus/common v0.4.1/go.mod 
h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
+github.com/prometheus/common v0.10.0 
h1:RyRA7RzGXQZiW+tGMr7sxa85G1z0yOpM1qq5c8lNawc=
+github.com/prometheus/common v0.10.0/go.mod 
h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
+github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod 
h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
+github.com/prometheus/procfs v0.0.2/go.mod 
h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
+github.com/prometheus/procfs v0.1.3 
h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8=
+github.com/prometheus/procfs v0.1.3/go.mod 
h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
+github.com/sirupsen/logrus v1.2.0/go.mod 
h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
+github.com/sirupsen/logrus v1.4.2 
h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
+github.com/sirupsen/logrus v1.4.2/go.mod 
h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
 github.com/spaolacci/murmur3 v1.1.0 
h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
 github.com/spaolacci/murmur3 v1.1.0/go.mod 
h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
 github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8=
@@ -58,24 +118,36 @@ github.com/stretchr/testify v1.4.0 
h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
 github.com/stretchr/testify v1.4.0/go.mod 
h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
 github.com/yahoo/athenz v1.8.55 h1:xGhxN3yLq334APyn0Zvcc+aqu78Q7BBhYJevM3EtTW0=
 github.com/yahoo/athenz v1.8.55/go.mod 
h1:G7LLFUH7Z/r4QAB7FfudfuA7Am/eCzO1GlzBhDL6Kv0=
+golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod 
h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod 
h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod 
h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod 
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod 
h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 
h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa 
h1:KIDDMLT1O0Nr7TSxp8xM5tJcdn8tgyAONntO829og1M=
 golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 
h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80=
+golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod 
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod 
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20190808195139-e713427fea3f/go.mod 
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 
h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod 
h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
 google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod 
h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
@@ -84,11 +156,15 @@ google.golang.org/protobuf 
v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE
 google.golang.org/protobuf v1.21.0/go.mod 
h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
 google.golang.org/protobuf v1.23.0 
h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
 google.golang.org/protobuf v1.23.0/go.mod 
h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod 
h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 
h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod 
h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
 gopkg.in/square/go-jose.v2 v2.4.1/go.mod 
h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
+gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
 gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/perf/pulsar-perf-go.go b/perf/pulsar-perf-go.go
index 3cd1d52..8fc0e09 100644
--- a/perf/pulsar-perf-go.go
+++ b/perf/pulsar-perf-go.go
@@ -24,17 +24,20 @@ import (
        _ "net/http/pprof"
        "os"
        "os/signal"
+       "strconv"
 
        "github.com/spf13/cobra"
 
        log "github.com/sirupsen/logrus"
 
        "github.com/apache/pulsar-client-go/pulsar"
+       "github.com/prometheus/client_golang/prometheus/promhttp"
 )
 
 // FlagProfile is a global flag
 var FlagProfile bool
 var flagDebug bool
+var PrometheusPort int
 
 type ClientArgs struct {
        ServiceURL string
@@ -71,6 +74,7 @@ func main() {
 
        flags := rootCmd.PersistentFlags()
        flags.BoolVar(&FlagProfile, "profile", false, "enable profiling")
+       flags.IntVar(&PrometheusPort, "metrics", 8000, "Port to use to export 
metrics for Prometheus. Use -1 to disable.")
        flags.BoolVar(&flagDebug, "debug", false, "enable debug output")
        flags.StringVarP(&clientArgs.ServiceURL, "service-url", "u",
                "pulsar://localhost:6650", "The Pulsar service URL")
@@ -78,6 +82,14 @@ func main() {
        rootCmd.AddCommand(newProducerCommand())
        rootCmd.AddCommand(newConsumerCommand())
 
+       if PrometheusPort > 0 {
+               go func() {
+                       log.Info("Starting Prometheus metrics at 
http://localhost:";, PrometheusPort, "/metrics")
+                       http.Handle("/metrics", promhttp.Handler())
+                       http.ListenAndServe(":"+strconv.Itoa(PrometheusPort), 
nil)
+               }()
+       }
+
        err := rootCmd.Execute()
        if err != nil {
                fmt.Fprintf(os.Stderr, "executing command error=%+v\n", err)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index feebcf2..e3db670 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -25,12 +25,32 @@ import (
        "sync"
        "time"
 
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promauto"
+
        log "github.com/sirupsen/logrus"
 
        "github.com/apache/pulsar-client-go/pulsar/internal"
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
 )
 
+var (
+       consumersOpened = promauto.NewCounter(prometheus.CounterOpts{
+               Name: "pulsar_client_consumers_opened",
+               Help: "Counter of consumers created by the client",
+       })
+
+       consumersClosed = promauto.NewCounter(prometheus.CounterOpts{
+               Name: "pulsar_client_consumers_closed",
+               Help: "Counter of consumers closed by the client",
+       })
+
+       consumersPartitions = promauto.NewGauge(prometheus.GaugeOpts{
+               Name: "pulsar_client_consumers_partitions_active",
+               Help: "Counter of individual partitions the consumers are 
currently active",
+       })
+)
+
 var ErrConsumerClosed = errors.New("consumer closed")
 
 const defaultNackRedeliveryDelay = 1 * time.Minute
@@ -276,12 +296,17 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
                return err
        }
 
+       consumersPartitions.Add(float64(partitionsToAdd))
        return nil
 }
 
 func topicSubscribe(client *client, options ConsumerOptions, topic string,
        messageCh chan ConsumerMessage, dlqRouter *dlqRouter) (Consumer, error) 
{
-       return newInternalConsumer(client, options, topic, messageCh, 
dlqRouter, false)
+       c, err := newInternalConsumer(client, options, topic, messageCh, 
dlqRouter, false)
+       if err == nil {
+               consumersOpened.Inc()
+       }
+       return c, err
 }
 
 func (c *consumer) Subscription() string {
@@ -381,6 +406,8 @@ func (c *consumer) Close() {
                c.ticker.Stop()
                c.client.handlers.Del(c)
                c.dlq.close()
+               consumersClosed.Inc()
+               consumersPartitions.Sub(float64(len(c.consumers)))
        })
 }
 
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index ec4d57a..f526487 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -72,6 +72,7 @@ func newMultiTopicConsumer(client *client, options 
ConsumerOptions, topics []str
                return nil, errs
        }
 
+       consumersOpened.Inc()
        return mtc, nil
 }
 
@@ -165,6 +166,7 @@ func (c *multiTopicConsumer) Close() {
                wg.Wait()
                close(c.closeCh)
                c.dlq.close()
+               consumersClosed.Inc()
        })
 }
 
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index e539e51..acf897d 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -23,6 +23,9 @@ import (
        "sync"
        "time"
 
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promauto"
+
        "github.com/gogo/protobuf/proto"
 
        log "github.com/sirupsen/logrus"
@@ -32,6 +35,49 @@ import (
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
 )
 
+var (
+       messagesReceived = promauto.NewCounter(prometheus.CounterOpts{
+               Name: "pulsar_client_messages_received",
+               Help: "Counter of messages received by the client",
+       })
+
+       bytesReceived = promauto.NewCounter(prometheus.CounterOpts{
+               Name: "pulsar_client_bytes_received",
+               Help: "Counter of bytes received by the client",
+       })
+
+       prefetchedMessages = promauto.NewGauge(prometheus.GaugeOpts{
+               Name: "pulsar_client_consumer_prefetched_messages",
+               Help: "Number of messages currently sitting in the consumer 
pre-fetch queue",
+       })
+
+       prefetchedBytes = promauto.NewGauge(prometheus.GaugeOpts{
+               Name: "pulsar_client_consumer_prefetched_bytes",
+               Help: "Total number of bytes currently sitting in the consumer 
pre-fetch queue",
+       })
+
+       acksCounter = promauto.NewCounter(prometheus.CounterOpts{
+               Name: "pulsar_client_consumer_acks",
+               Help: "Counter of messages acked by client",
+       })
+
+       nacksCounter = promauto.NewCounter(prometheus.CounterOpts{
+               Name: "pulsar_client_consumer_nacks",
+               Help: "Counter of messages nacked by client",
+       })
+
+       dlqCounter = promauto.NewCounter(prometheus.CounterOpts{
+               Name: "pulsar_client_consumer_dlq_messages",
+               Help: "Counter of messages sent to Dead letter queue",
+       })
+
+       processingTime = promauto.NewHistogram(prometheus.HistogramOpts{
+               Name:    "pulsar_client_consumer_processing_time_seconds",
+               Help:    "Time it takes for application to process messages",
+               Buckets: []float64{.0005, .001, .005, .01, .025, .05, .1, .25, 
.5, 1, 2.5, 5, 10},
+       })
+)
+
 type consumerState int
 
 const (
@@ -222,6 +268,8 @@ func (pc *partitionConsumer) internalGetLastMessageID(req 
*getLastMsgIDRequest)
 
 func (pc *partitionConsumer) AckID(msgID messageID) {
        if !msgID.IsZero() && msgID.ack() {
+               acksCounter.Inc()
+               
processingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano())
 / 1.0e9)
                req := &ackRequest{
                        msgID: msgID,
                }
@@ -231,6 +279,7 @@ func (pc *partitionConsumer) AckID(msgID messageID) {
 
 func (pc *partitionConsumer) NackID(msgID messageID) {
        pc.nackTracker.Add(msgID)
+       nacksCounter.Inc()
 }
 
 func (pc *partitionConsumer) Redeliver(msgIds []messageID) {
@@ -390,6 +439,10 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
        if numMsgs > 1 {
                ackTracker = newAckTracker(numMsgs)
        }
+
+       messagesReceived.Add(float64(numMsgs))
+       prefetchedMessages.Add(float64(numMsgs))
+
        for i := 0; i < numMsgs; i++ {
                smm, payload, err := reader.ReadMessage()
                if err != nil {
@@ -397,6 +450,9 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
                        return err
                }
 
+               bytesReceived.Add(float64(len(payload)))
+               prefetchedBytes.Add(float64(len(payload)))
+
                msgID := newTrackingMessageID(
                        int64(pbMsgID.GetLedgerId()),
                        int64(pbMsgID.GetEntryId()),
@@ -507,11 +563,15 @@ func (pc *partitionConsumer) dispatcher() {
 
                        if pc.dlq.shouldSendToDlq(&nextMessage) {
                                // pass the message to the DLQ router
+                               dlqCounter.Inc()
                                messageCh = pc.dlq.Chan()
                        } else {
                                // pass the message to application channel
                                messageCh = pc.messageCh
                        }
+
+                       prefetchedMessages.Dec()
+                       prefetchedBytes.Sub(float64(len(messages[0].payLoad)))
                } else {
                        // we are ready for more messages
                        queueCh = pc.queueCh
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index ff7cbca..a6dfd56 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -114,6 +114,7 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn 
*internal.TopicName, p
 
        go rc.monitor()
 
+       consumersOpened.Inc()
        return rc, nil
 }
 
@@ -214,6 +215,7 @@ func (c *regexConsumer) Close() {
                }
                wg.Wait()
                c.dlq.close()
+               consumersClosed.Inc()
        })
 }
 
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index f1a9a7c..562dfb6 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -34,8 +34,9 @@ type messageID struct {
        batchIdx     int32
        partitionIdx int32
 
-       tracker  *ackTracker
-       consumer acker
+       tracker      *ackTracker
+       consumer     acker
+       receivedTime time.Time
 }
 
 func (id messageID) IsZero() bool {
@@ -130,6 +131,7 @@ func newTrackingMessageID(ledgerID int64, entryID int64, 
batchIdx int32, partiti
                batchIdx:     batchIdx,
                partitionIdx: partitionIdx,
                tracker:      tracker,
+               receivedTime: time.Now(),
        }
 }
 
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 06d1543..cc770bb 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -29,6 +29,9 @@ import (
        "sync/atomic"
        "time"
 
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promauto"
+
        "github.com/gogo/protobuf/proto"
        log "github.com/sirupsen/logrus"
 
@@ -44,6 +47,28 @@ const (
        PulsarProtocolVersion = int32(pb.ProtocolVersion_v13)
 )
 
+var (
+       connectionsOpened = promauto.NewCounter(prometheus.CounterOpts{
+               Name: "pulsar_client_connections_opened",
+               Help: "Counter of connections created by the client",
+       })
+
+       connectionsClosed = promauto.NewCounter(prometheus.CounterOpts{
+               Name: "pulsar_client_connections_closed",
+               Help: "Counter of connections closed by the client",
+       })
+
+       connectionsEstablishmentErrors = 
promauto.NewCounter(prometheus.CounterOpts{
+               Name: "pulsar_client_connections_establishment_errors",
+               Help: "Counter of errors in connections establishment",
+       })
+
+       connectionsHandshakeErrors = promauto.NewCounter(prometheus.CounterOpts{
+               Name: "pulsar_client_connections_handshake_errors",
+               Help: "Counter of errors in connections handshake (eg: authz)",
+       })
+)
+
 type TLSOptions struct {
        TrustCertsFilePath      string
        AllowInsecureConnection bool
@@ -201,11 +226,14 @@ func (c *connection) start() {
        go func() {
                if c.connect() {
                        if c.doHandshake() {
+                               connectionsOpened.Inc()
                                c.run()
                        } else {
+                               connectionsHandshakeErrors.Inc()
                                c.changeState(connectionClosed)
                        }
                } else {
+                       connectionsEstablishmentErrors.Inc()
                        c.changeState(connectionClosed)
                }
        }()
@@ -695,6 +723,8 @@ func (c *connection) Close() {
        for _, handler := range consumerHandlers {
                handler.ConnectionClosed()
        }
+
+       connectionsClosed.Inc()
 }
 
 func (c *connection) changeState(state connectionState) {
diff --git a/pulsar/internal/lookup_service.go 
b/pulsar/internal/lookup_service.go
index 9537c57..1673f4d 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -22,12 +22,22 @@ import (
        "fmt"
        "net/url"
 
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promauto"
+
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
        "github.com/gogo/protobuf/proto"
 
        log "github.com/sirupsen/logrus"
 )
 
+var (
+       lookupRequestsCount = promauto.NewCounter(prometheus.CounterOpts{
+               Name: "pulsar_client_lookup_count",
+               Help: "Counter of lookup requests made by the client",
+       })
+)
+
 // LookupResult encapsulates a struct for lookup a request, containing two 
parts: LogicalAddr, PhysicalAddr.
 type LookupResult struct {
        LogicalAddr  *url.URL
@@ -82,6 +92,7 @@ func (ls *lookupService) getBrokerAddress(lr 
*pb.CommandLookupTopicResponse) (lo
 const lookupResultMaxRedirect = 20
 
 func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
+       lookupRequestsCount.Inc()
        id := ls.rpcClient.NewRequestID()
        res, err := ls.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_LOOKUP, 
&pb.CommandLookupTopic{
                RequestId:     &id,
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index ad896ed..0d4cc12 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -24,12 +24,22 @@ import (
        "sync/atomic"
        "time"
 
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promauto"
+
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
        "github.com/gogo/protobuf/proto"
 
        log "github.com/sirupsen/logrus"
 )
 
+var (
+       rpcRequestCount = promauto.NewCounter(prometheus.CounterOpts{
+               Name: "pulsar_client_rpc_count",
+               Help: "Counter of RPC requests made by the client",
+       })
+)
+
 type RPCResult struct {
        Response *pb.BaseCommand
        Cnx      Connection
@@ -81,6 +91,7 @@ func (c *rpcClient) RequestToAnyBroker(requestID uint64, 
cmdType pb.BaseCommand_
 
 func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, 
requestID uint64,
        cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) 
{
+       rpcRequestCount.Inc()
        cnx, err := c.getConn(logicalAddr, physicalAddr)
        if err != nil {
                return nil, err
@@ -132,6 +143,7 @@ func (c *rpcClient) getConn(logicalAddr *url.URL, 
physicalAddr *url.URL) (Connec
 
 func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType 
pb.BaseCommand_Type,
        message proto.Message) (*RPCResult, error) {
+       rpcRequestCount.Inc()
        wg := sync.WaitGroup{}
        wg.Add(1)
 
@@ -151,6 +163,7 @@ func (c *rpcClient) RequestOnCnx(cnx Connection, requestID 
uint64, cmdType pb.Ba
 }
 
 func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, cmdType 
pb.BaseCommand_Type, message proto.Message) {
+       rpcRequestCount.Inc()
        cnx.SendRequestNoWait(baseCommand(cmdType, message))
 }
 
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 35dae28..01c5d76 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -24,11 +24,31 @@ import (
        "time"
        "unsafe"
 
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promauto"
+
        log "github.com/sirupsen/logrus"
 
        "github.com/apache/pulsar-client-go/pulsar/internal"
 )
 
+var (
+       producersOpened = promauto.NewCounter(prometheus.CounterOpts{
+               Name: "pulsar_client_producers_opened",
+               Help: "Counter of producers created by the client",
+       })
+
+       producersClosed = promauto.NewCounter(prometheus.CounterOpts{
+               Name: "pulsar_client_producers_closed",
+               Help: "Counter of producers closed by the client",
+       })
+
+       producersPartitions = promauto.NewGauge(prometheus.GaugeOpts{
+               Name: "pulsar_client_producers_partitions_active",
+               Help: "Counter of individual partitions the producers are 
currently active",
+       })
+)
+
 type producer struct {
        sync.RWMutex
        client        *client
@@ -103,6 +123,7 @@ func newProducer(client *client, options *ProducerOptions) 
(*producer, error) {
                }
        }()
 
+       producersOpened.Inc()
        return p, nil
 }
 
@@ -182,6 +203,7 @@ func (p *producer) internalCreatePartitionsProducers() 
error {
                return err
        }
 
+       producersPartitions.Add(float64(partitionsToAdd))
        atomic.StorePointer(&p.producersPtr, unsafe.Pointer(&p.producers))
        atomic.StoreUint32(&p.numPartitions, uint32(len(p.producers)))
        return nil
@@ -262,4 +284,6 @@ func (p *producer) Close() {
                pp.Close()
        }
        p.client.handlers.Del(p)
+       producersPartitions.Sub(float64(len(p.producers)))
+       producersClosed.Inc()
 }
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 9243804..b8dc13d 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -24,6 +24,9 @@ import (
        "sync/atomic"
        "time"
 
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promauto"
+
        "github.com/apache/pulsar-client-go/pulsar/internal/compression"
 
        "github.com/gogo/protobuf/proto"
@@ -49,6 +52,39 @@ var (
        buffersPool sync.Pool
 )
 
+var (
+       messagesPublished = promauto.NewCounter(prometheus.CounterOpts{
+               Name: "pulsar_client_messages_published",
+               Help: "Counter of messages published by the client",
+       })
+
+       bytesPublished = promauto.NewCounter(prometheus.CounterOpts{
+               Name: "pulsar_client_bytes_published",
+               Help: "Counter of messages published by the client",
+       })
+
+       messagesPending = promauto.NewGauge(prometheus.GaugeOpts{
+               Name: "pulsar_client_producer_pending_messages",
+               Help: "Counter of messages pending to be published by the 
client",
+       })
+
+       bytesPending = promauto.NewGauge(prometheus.GaugeOpts{
+               Name: "pulsar_client_producer_pending_bytes",
+               Help: "Counter of bytes pending to be published by the client",
+       })
+
+       publishErrors = promauto.NewCounter(prometheus.CounterOpts{
+               Name: "pulsar_client_producer_errors",
+               Help: "Counter of publish errors",
+       })
+
+       publishLatency = promauto.NewHistogram(prometheus.HistogramOpts{
+               Name:    "pulsar_client_producer_latency_seconds",
+               Help:    "Publish latency experienced by the client",
+               Buckets: []float64{.0005, .001, .005, .01, .025, .05, .1, .25, 
.5, 1, 2.5, 5, 10},
+       })
+)
+
 type partitionProducer struct {
        state  int32
        client *client
@@ -261,6 +297,7 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
                p.log.WithField("size", len(msg.Payload)).
                        WithField("properties", msg.Properties).
                        WithError(errMessageTooLarge).Error()
+               publishErrors.Inc()
                return
        }
 
@@ -399,13 +436,19 @@ func (p *partitionProducer) SendAsync(ctx 
context.Context, msg *ProducerMessage,
 
 func (p *partitionProducer) internalSendAsync(ctx context.Context, msg 
*ProducerMessage,
        callback func(MessageID, *ProducerMessage, error), flushImmediately 
bool) {
-       p.publishSemaphore.Acquire()
+
        sr := &sendRequest{
                ctx:              ctx,
                msg:              msg,
                callback:         callback,
                flushImmediately: flushImmediately,
+               publishTime:      time.Now(),
        }
+
+       messagesPending.Inc()
+       bytesPending.Add(float64(len(sr.msg.Payload)))
+
+       p.publishSemaphore.Acquire()
        p.eventsChan <- sr
 }
 
@@ -426,6 +469,8 @@ func (p *partitionProducer) ReceivedSendReceipt(response 
*pb.CommandSendReceipt)
        // The ack was indeed for the expected item in the queue, we can remove 
it and trigger the callback
        p.pendingQueue.Poll()
 
+       now := time.Now().UnixNano()
+
        // lock the pending item while sending the requests
        pi.Lock()
        defer pi.Unlock()
@@ -434,6 +479,13 @@ func (p *partitionProducer) ReceivedSendReceipt(response 
*pb.CommandSendReceipt)
                if sr.msg != nil {
                        atomic.StoreInt64(&p.lastSequenceID, 
int64(pi.sequenceID))
                        p.publishSemaphore.Release()
+
+                       
publishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
+                       messagesPublished.Inc()
+                       messagesPending.Dec()
+                       payloadSize := float64(len(sr.msg.Payload))
+                       bytesPublished.Add(payloadSize)
+                       bytesPending.Sub(payloadSize)
                }
 
                if sr.callback != nil {
@@ -516,6 +568,7 @@ type sendRequest struct {
        ctx              context.Context
        msg              *ProducerMessage
        callback         func(MessageID, *ProducerMessage, error)
+       publishTime      time.Time
        flushImmediately bool
 }
 
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index b74b35b..d97cc96 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -20,6 +20,9 @@ package pulsar
 import (
        "context"
 
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promauto"
+
        log "github.com/sirupsen/logrus"
 )
 
@@ -27,6 +30,18 @@ const (
        defaultReceiverQueueSize = 1000
 )
 
+var (
+       readersOpened = promauto.NewCounter(prometheus.CounterOpts{
+               Name: "pulsar_client_readers_opened",
+               Help: "Counter of readers created by the client",
+       })
+
+       readersClosed = promauto.NewCounter(prometheus.CounterOpts{
+               Name: "pulsar_client_readers_closed",
+               Help: "Counter of readers closed by the client",
+       })
+)
+
 type reader struct {
        pc                  *partitionConsumer
        messageCh           chan ConsumerMessage
@@ -101,6 +116,7 @@ func newReader(client *client, options ReaderOptions) 
(Reader, error) {
        }
 
        reader.pc = pc
+       readersOpened.Inc()
        return reader, nil
 }
 
@@ -162,4 +178,5 @@ func (r *reader) hasMoreMessages() bool {
 
 func (r *reader) Close() {
        r.pc.Close()
+       readersClosed.Inc()
 }

Reply via email to