This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 220dcb4 Export client metrics to Prometheus (#317) 220dcb4 is described below commit 220dcb40d531c16e59d323d420ee8f00492001dd Author: Matteo Merli <mme...@apache.org> AuthorDate: Fri Jul 10 12:11:30 2020 -0700 Export client metrics to Prometheus (#317) * Export client metrics to Prometheus * Added missing messagesReceived.Inc() --- go.mod | 4 +- go.sum | 84 +++++++++++++++++++++++++++++++++++++-- perf/pulsar-perf-go.go | 12 ++++++ pulsar/consumer_impl.go | 29 +++++++++++++- pulsar/consumer_multitopic.go | 2 + pulsar/consumer_partition.go | 60 ++++++++++++++++++++++++++++ pulsar/consumer_regex.go | 2 + pulsar/impl_message.go | 6 ++- pulsar/internal/connection.go | 30 ++++++++++++++ pulsar/internal/lookup_service.go | 11 +++++ pulsar/internal/rpc_client.go | 13 ++++++ pulsar/producer_impl.go | 24 +++++++++++ pulsar/producer_partition.go | 55 ++++++++++++++++++++++++- pulsar/reader_impl.go | 17 ++++++++ 14 files changed, 340 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index f26abd0..afae8cc 100644 --- a/go.mod +++ b/go.mod @@ -9,9 +9,11 @@ require ( github.com/gogo/protobuf v1.3.1 github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/klauspost/compress v1.10.8 + github.com/kr/pretty v0.2.0 // indirect github.com/pierrec/lz4 v2.0.5+incompatible github.com/pkg/errors v0.8.1 - github.com/sirupsen/logrus v1.4.1 + github.com/prometheus/client_golang v1.7.1 + github.com/sirupsen/logrus v1.4.2 github.com/spaolacci/murmur3 v1.1.0 github.com/spf13/cobra v0.0.3 github.com/spf13/pflag v1.0.3 // indirect diff --git a/go.sum b/go.sum index 302cf5b..9f51dbb 100644 --- a/go.sum +++ b/go.sum @@ -1,46 +1,106 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32 h1:/gZKpgSMydtrih81nvUhlkXpZIUfthKShSCVbRzBt9Y= github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4= github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI= github.com/ardielle/ardielle-tools v1.5.4/go.mod h1:oZN+JRMnqGiIhrzkRN9l26Cej9dEx4jeNG6A+AdkShk= github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6 h1:KXlsf+qt/X5ttPGEjR0tPH1xaWWoKBEg9Q1THAj2h3I= github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE= github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q= github.com/boynton/repl v0.0.0-20170116235056-348863958e3e/go.mod h1:Crc/GCZ3NXDVCio7Yr0o+SSrytpcFhLmVCIzi0s49t4= +github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= +github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA= github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0= +github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= +github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jawher/mow.cli v1.0.4/go.mod h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk= github.com/jawher/mow.cli v1.1.0/go.mod h1:aNaQlc7ozF3vw6IJ2dHjp2ZFiA4ozMIYY6PyuRJwlUg= +github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.10.5 h1:7q6vHIqubShURwQz8cQK6yIe/xC3IF0Vm7TGfqjewrc= -github.com/klauspost/compress v1.10.5/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.10.8 h1:eLeJ3dr/Y9+XRfJT4l+8ZjmtB5RPJhucH2HeCV5+IZY= github.com/klauspost/compress v1.10.8/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= +github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k= -github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= +github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= +github.com/prometheus/client_golang v1.7.1 h1:NTGy1Ja9pByO+xAeH/qiWnLrKtr3hJPNjaVUwnjpdpA= +github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M= +github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/common v0.10.0 h1:RyRA7RzGXQZiW+tGMr7sxa85G1z0yOpM1qq5c8lNawc= +github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= +github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8= +github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8= @@ -58,24 +118,36 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/yahoo/athenz v1.8.55 h1:xGhxN3yLq334APyn0Zvcc+aqu78Q7BBhYJevM3EtTW0= github.com/yahoo/athenz v1.8.55/go.mod h1:G7LLFUH7Z/r4QAB7FfudfuA7Am/eCzO1GlzBhDL6Kv0= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa h1:KIDDMLT1O0Nr7TSxp8xM5tJcdn8tgyAONntO829og1M= golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80= +golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190808195139-e713427fea3f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= @@ -84,11 +156,15 @@ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/square/go-jose.v2 v2.4.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo= gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/perf/pulsar-perf-go.go b/perf/pulsar-perf-go.go index 3cd1d52..8fc0e09 100644 --- a/perf/pulsar-perf-go.go +++ b/perf/pulsar-perf-go.go @@ -24,17 +24,20 @@ import ( _ "net/http/pprof" "os" "os/signal" + "strconv" "github.com/spf13/cobra" log "github.com/sirupsen/logrus" "github.com/apache/pulsar-client-go/pulsar" + "github.com/prometheus/client_golang/prometheus/promhttp" ) // FlagProfile is a global flag var FlagProfile bool var flagDebug bool +var PrometheusPort int type ClientArgs struct { ServiceURL string @@ -71,6 +74,7 @@ func main() { flags := rootCmd.PersistentFlags() flags.BoolVar(&FlagProfile, "profile", false, "enable profiling") + flags.IntVar(&PrometheusPort, "metrics", 8000, "Port to use to export metrics for Prometheus. Use -1 to disable.") flags.BoolVar(&flagDebug, "debug", false, "enable debug output") flags.StringVarP(&clientArgs.ServiceURL, "service-url", "u", "pulsar://localhost:6650", "The Pulsar service URL") @@ -78,6 +82,14 @@ func main() { rootCmd.AddCommand(newProducerCommand()) rootCmd.AddCommand(newConsumerCommand()) + if PrometheusPort > 0 { + go func() { + log.Info("Starting Prometheus metrics at http://localhost:", PrometheusPort, "/metrics") + http.Handle("/metrics", promhttp.Handler()) + http.ListenAndServe(":"+strconv.Itoa(PrometheusPort), nil) + }() + } + err := rootCmd.Execute() if err != nil { fmt.Fprintf(os.Stderr, "executing command error=%+v\n", err) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index feebcf2..e3db670 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -25,12 +25,32 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + log "github.com/sirupsen/logrus" "github.com/apache/pulsar-client-go/pulsar/internal" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" ) +var ( + consumersOpened = promauto.NewCounter(prometheus.CounterOpts{ + Name: "pulsar_client_consumers_opened", + Help: "Counter of consumers created by the client", + }) + + consumersClosed = promauto.NewCounter(prometheus.CounterOpts{ + Name: "pulsar_client_consumers_closed", + Help: "Counter of consumers closed by the client", + }) + + consumersPartitions = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "pulsar_client_consumers_partitions_active", + Help: "Counter of individual partitions the consumers are currently active", + }) +) + var ErrConsumerClosed = errors.New("consumer closed") const defaultNackRedeliveryDelay = 1 * time.Minute @@ -276,12 +296,17 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { return err } + consumersPartitions.Add(float64(partitionsToAdd)) return nil } func topicSubscribe(client *client, options ConsumerOptions, topic string, messageCh chan ConsumerMessage, dlqRouter *dlqRouter) (Consumer, error) { - return newInternalConsumer(client, options, topic, messageCh, dlqRouter, false) + c, err := newInternalConsumer(client, options, topic, messageCh, dlqRouter, false) + if err == nil { + consumersOpened.Inc() + } + return c, err } func (c *consumer) Subscription() string { @@ -381,6 +406,8 @@ func (c *consumer) Close() { c.ticker.Stop() c.client.handlers.Del(c) c.dlq.close() + consumersClosed.Inc() + consumersPartitions.Sub(float64(len(c.consumers))) }) } diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index ec4d57a..f526487 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -72,6 +72,7 @@ func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []str return nil, errs } + consumersOpened.Inc() return mtc, nil } @@ -165,6 +166,7 @@ func (c *multiTopicConsumer) Close() { wg.Wait() close(c.closeCh) c.dlq.close() + consumersClosed.Inc() }) } diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index e539e51..acf897d 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -23,6 +23,9 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/gogo/protobuf/proto" log "github.com/sirupsen/logrus" @@ -32,6 +35,49 @@ import ( pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" ) +var ( + messagesReceived = promauto.NewCounter(prometheus.CounterOpts{ + Name: "pulsar_client_messages_received", + Help: "Counter of messages received by the client", + }) + + bytesReceived = promauto.NewCounter(prometheus.CounterOpts{ + Name: "pulsar_client_bytes_received", + Help: "Counter of bytes received by the client", + }) + + prefetchedMessages = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "pulsar_client_consumer_prefetched_messages", + Help: "Number of messages currently sitting in the consumer pre-fetch queue", + }) + + prefetchedBytes = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "pulsar_client_consumer_prefetched_bytes", + Help: "Total number of bytes currently sitting in the consumer pre-fetch queue", + }) + + acksCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "pulsar_client_consumer_acks", + Help: "Counter of messages acked by client", + }) + + nacksCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "pulsar_client_consumer_nacks", + Help: "Counter of messages nacked by client", + }) + + dlqCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "pulsar_client_consumer_dlq_messages", + Help: "Counter of messages sent to Dead letter queue", + }) + + processingTime = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "pulsar_client_consumer_processing_time_seconds", + Help: "Time it takes for application to process messages", + Buckets: []float64{.0005, .001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}, + }) +) + type consumerState int const ( @@ -222,6 +268,8 @@ func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) func (pc *partitionConsumer) AckID(msgID messageID) { if !msgID.IsZero() && msgID.ack() { + acksCounter.Inc() + processingTime.Observe(float64(time.Now().UnixNano()-msgID.receivedTime.UnixNano()) / 1.0e9) req := &ackRequest{ msgID: msgID, } @@ -231,6 +279,7 @@ func (pc *partitionConsumer) AckID(msgID messageID) { func (pc *partitionConsumer) NackID(msgID messageID) { pc.nackTracker.Add(msgID) + nacksCounter.Inc() } func (pc *partitionConsumer) Redeliver(msgIds []messageID) { @@ -390,6 +439,10 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header if numMsgs > 1 { ackTracker = newAckTracker(numMsgs) } + + messagesReceived.Add(float64(numMsgs)) + prefetchedMessages.Add(float64(numMsgs)) + for i := 0; i < numMsgs; i++ { smm, payload, err := reader.ReadMessage() if err != nil { @@ -397,6 +450,9 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header return err } + bytesReceived.Add(float64(len(payload))) + prefetchedBytes.Add(float64(len(payload))) + msgID := newTrackingMessageID( int64(pbMsgID.GetLedgerId()), int64(pbMsgID.GetEntryId()), @@ -507,11 +563,15 @@ func (pc *partitionConsumer) dispatcher() { if pc.dlq.shouldSendToDlq(&nextMessage) { // pass the message to the DLQ router + dlqCounter.Inc() messageCh = pc.dlq.Chan() } else { // pass the message to application channel messageCh = pc.messageCh } + + prefetchedMessages.Dec() + prefetchedBytes.Sub(float64(len(messages[0].payLoad))) } else { // we are ready for more messages queueCh = pc.queueCh diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index ff7cbca..a6dfd56 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -114,6 +114,7 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, p go rc.monitor() + consumersOpened.Inc() return rc, nil } @@ -214,6 +215,7 @@ func (c *regexConsumer) Close() { } wg.Wait() c.dlq.close() + consumersClosed.Inc() }) } diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go index f1a9a7c..562dfb6 100644 --- a/pulsar/impl_message.go +++ b/pulsar/impl_message.go @@ -34,8 +34,9 @@ type messageID struct { batchIdx int32 partitionIdx int32 - tracker *ackTracker - consumer acker + tracker *ackTracker + consumer acker + receivedTime time.Time } func (id messageID) IsZero() bool { @@ -130,6 +131,7 @@ func newTrackingMessageID(ledgerID int64, entryID int64, batchIdx int32, partiti batchIdx: batchIdx, partitionIdx: partitionIdx, tracker: tracker, + receivedTime: time.Now(), } } diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go index 06d1543..cc770bb 100644 --- a/pulsar/internal/connection.go +++ b/pulsar/internal/connection.go @@ -29,6 +29,9 @@ import ( "sync/atomic" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/gogo/protobuf/proto" log "github.com/sirupsen/logrus" @@ -44,6 +47,28 @@ const ( PulsarProtocolVersion = int32(pb.ProtocolVersion_v13) ) +var ( + connectionsOpened = promauto.NewCounter(prometheus.CounterOpts{ + Name: "pulsar_client_connections_opened", + Help: "Counter of connections created by the client", + }) + + connectionsClosed = promauto.NewCounter(prometheus.CounterOpts{ + Name: "pulsar_client_connections_closed", + Help: "Counter of connections closed by the client", + }) + + connectionsEstablishmentErrors = promauto.NewCounter(prometheus.CounterOpts{ + Name: "pulsar_client_connections_establishment_errors", + Help: "Counter of errors in connections establishment", + }) + + connectionsHandshakeErrors = promauto.NewCounter(prometheus.CounterOpts{ + Name: "pulsar_client_connections_handshake_errors", + Help: "Counter of errors in connections handshake (eg: authz)", + }) +) + type TLSOptions struct { TrustCertsFilePath string AllowInsecureConnection bool @@ -201,11 +226,14 @@ func (c *connection) start() { go func() { if c.connect() { if c.doHandshake() { + connectionsOpened.Inc() c.run() } else { + connectionsHandshakeErrors.Inc() c.changeState(connectionClosed) } } else { + connectionsEstablishmentErrors.Inc() c.changeState(connectionClosed) } }() @@ -695,6 +723,8 @@ func (c *connection) Close() { for _, handler := range consumerHandlers { handler.ConnectionClosed() } + + connectionsClosed.Inc() } func (c *connection) changeState(state connectionState) { diff --git a/pulsar/internal/lookup_service.go b/pulsar/internal/lookup_service.go index 9537c57..1673f4d 100644 --- a/pulsar/internal/lookup_service.go +++ b/pulsar/internal/lookup_service.go @@ -22,12 +22,22 @@ import ( "fmt" "net/url" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/gogo/protobuf/proto" log "github.com/sirupsen/logrus" ) +var ( + lookupRequestsCount = promauto.NewCounter(prometheus.CounterOpts{ + Name: "pulsar_client_lookup_count", + Help: "Counter of lookup requests made by the client", + }) +) + // LookupResult encapsulates a struct for lookup a request, containing two parts: LogicalAddr, PhysicalAddr. type LookupResult struct { LogicalAddr *url.URL @@ -82,6 +92,7 @@ func (ls *lookupService) getBrokerAddress(lr *pb.CommandLookupTopicResponse) (lo const lookupResultMaxRedirect = 20 func (ls *lookupService) Lookup(topic string) (*LookupResult, error) { + lookupRequestsCount.Inc() id := ls.rpcClient.NewRequestID() res, err := ls.rpcClient.RequestToAnyBroker(id, pb.BaseCommand_LOOKUP, &pb.CommandLookupTopic{ RequestId: &id, diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go index ad896ed..0d4cc12 100644 --- a/pulsar/internal/rpc_client.go +++ b/pulsar/internal/rpc_client.go @@ -24,12 +24,22 @@ import ( "sync/atomic" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/gogo/protobuf/proto" log "github.com/sirupsen/logrus" ) +var ( + rpcRequestCount = promauto.NewCounter(prometheus.CounterOpts{ + Name: "pulsar_client_rpc_count", + Help: "Counter of RPC requests made by the client", + }) +) + type RPCResult struct { Response *pb.BaseCommand Cnx Connection @@ -81,6 +91,7 @@ func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_ func (c *rpcClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) { + rpcRequestCount.Inc() cnx, err := c.getConn(logicalAddr, physicalAddr) if err != nil { return nil, err @@ -132,6 +143,7 @@ func (c *rpcClient) getConn(logicalAddr *url.URL, physicalAddr *url.URL) (Connec func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) { + rpcRequestCount.Inc() wg := sync.WaitGroup{} wg.Add(1) @@ -151,6 +163,7 @@ func (c *rpcClient) RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.Ba } func (c *rpcClient) RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) { + rpcRequestCount.Inc() cnx.SendRequestNoWait(baseCommand(cmdType, message)) } diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go index 35dae28..01c5d76 100644 --- a/pulsar/producer_impl.go +++ b/pulsar/producer_impl.go @@ -24,11 +24,31 @@ import ( "time" "unsafe" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + log "github.com/sirupsen/logrus" "github.com/apache/pulsar-client-go/pulsar/internal" ) +var ( + producersOpened = promauto.NewCounter(prometheus.CounterOpts{ + Name: "pulsar_client_producers_opened", + Help: "Counter of producers created by the client", + }) + + producersClosed = promauto.NewCounter(prometheus.CounterOpts{ + Name: "pulsar_client_producers_closed", + Help: "Counter of producers closed by the client", + }) + + producersPartitions = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "pulsar_client_producers_partitions_active", + Help: "Counter of individual partitions the producers are currently active", + }) +) + type producer struct { sync.RWMutex client *client @@ -103,6 +123,7 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) { } }() + producersOpened.Inc() return p, nil } @@ -182,6 +203,7 @@ func (p *producer) internalCreatePartitionsProducers() error { return err } + producersPartitions.Add(float64(partitionsToAdd)) atomic.StorePointer(&p.producersPtr, unsafe.Pointer(&p.producers)) atomic.StoreUint32(&p.numPartitions, uint32(len(p.producers))) return nil @@ -262,4 +284,6 @@ func (p *producer) Close() { pp.Close() } p.client.handlers.Del(p) + producersPartitions.Sub(float64(len(p.producers))) + producersClosed.Inc() } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 9243804..b8dc13d 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -24,6 +24,9 @@ import ( "sync/atomic" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/apache/pulsar-client-go/pulsar/internal/compression" "github.com/gogo/protobuf/proto" @@ -49,6 +52,39 @@ var ( buffersPool sync.Pool ) +var ( + messagesPublished = promauto.NewCounter(prometheus.CounterOpts{ + Name: "pulsar_client_messages_published", + Help: "Counter of messages published by the client", + }) + + bytesPublished = promauto.NewCounter(prometheus.CounterOpts{ + Name: "pulsar_client_bytes_published", + Help: "Counter of messages published by the client", + }) + + messagesPending = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "pulsar_client_producer_pending_messages", + Help: "Counter of messages pending to be published by the client", + }) + + bytesPending = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "pulsar_client_producer_pending_bytes", + Help: "Counter of bytes pending to be published by the client", + }) + + publishErrors = promauto.NewCounter(prometheus.CounterOpts{ + Name: "pulsar_client_producer_errors", + Help: "Counter of publish errors", + }) + + publishLatency = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "pulsar_client_producer_latency_seconds", + Help: "Publish latency experienced by the client", + Buckets: []float64{.0005, .001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}, + }) +) + type partitionProducer struct { state int32 client *client @@ -261,6 +297,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { p.log.WithField("size", len(msg.Payload)). WithField("properties", msg.Properties). WithError(errMessageTooLarge).Error() + publishErrors.Inc() return } @@ -399,13 +436,19 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error), flushImmediately bool) { - p.publishSemaphore.Acquire() + sr := &sendRequest{ ctx: ctx, msg: msg, callback: callback, flushImmediately: flushImmediately, + publishTime: time.Now(), } + + messagesPending.Inc() + bytesPending.Add(float64(len(sr.msg.Payload))) + + p.publishSemaphore.Acquire() p.eventsChan <- sr } @@ -426,6 +469,8 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) // The ack was indeed for the expected item in the queue, we can remove it and trigger the callback p.pendingQueue.Poll() + now := time.Now().UnixNano() + // lock the pending item while sending the requests pi.Lock() defer pi.Unlock() @@ -434,6 +479,13 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) if sr.msg != nil { atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID)) p.publishSemaphore.Release() + + publishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9) + messagesPublished.Inc() + messagesPending.Dec() + payloadSize := float64(len(sr.msg.Payload)) + bytesPublished.Add(payloadSize) + bytesPending.Sub(payloadSize) } if sr.callback != nil { @@ -516,6 +568,7 @@ type sendRequest struct { ctx context.Context msg *ProducerMessage callback func(MessageID, *ProducerMessage, error) + publishTime time.Time flushImmediately bool } diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index b74b35b..d97cc96 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -20,6 +20,9 @@ package pulsar import ( "context" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + log "github.com/sirupsen/logrus" ) @@ -27,6 +30,18 @@ const ( defaultReceiverQueueSize = 1000 ) +var ( + readersOpened = promauto.NewCounter(prometheus.CounterOpts{ + Name: "pulsar_client_readers_opened", + Help: "Counter of readers created by the client", + }) + + readersClosed = promauto.NewCounter(prometheus.CounterOpts{ + Name: "pulsar_client_readers_closed", + Help: "Counter of readers closed by the client", + }) +) + type reader struct { pc *partitionConsumer messageCh chan ConsumerMessage @@ -101,6 +116,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { } reader.pc = pc + readersOpened.Inc() return reader, nil } @@ -162,4 +178,5 @@ func (r *reader) hasMoreMessages() bool { func (r *reader) Close() { r.pc.Close() + readersClosed.Inc() }