Copilot commented on code in PR #1468:
URL: https://github.com/apache/pulsar-client-go/pull/1468#discussion_r2963686445
##########
pulsar/internal/rpc_client.go:
##########
@@ -232,39 +232,38 @@ func (c *rpcClient) LookupService(URL string)
(LookupService, error) {
return lookupService, nil
}
- serviceURL, err := url.Parse(URL)
- if err != nil {
- return nil, fmt.Errorf("failed to parse URL '%s': %w", URL, err)
- }
-
- lookupService, err = c.NewLookupService(serviceURL)
+ lookupService, err := c.newLookupService(URL)
if err != nil {
return nil, fmt.Errorf("failed to create lookup service for URL
'%s': %w", URL, err)
}
c.urlLookupServiceMap[URL] = lookupService
return lookupService, nil
}
-func (c *rpcClient) NewLookupService(url *url.URL) (LookupService, error) {
+func (c *rpcClient) newLookupService(serviceURL string) (LookupService, error)
{
+ serviceURI, err := NewPulsarServiceURIFromURIString(serviceURL)
+ if err != nil {
+ return nil, fmt.Errorf("failed to parse service URL '%s': %w",
serviceURL, err)
+ }
- switch url.Scheme {
- case "pulsar", "pulsar+ssl":
- serviceNameResolver := NewPulsarServiceNameResolver(url)
- return NewLookupService(c, url, serviceNameResolver,
- c.tlsConfig != nil, c.listenerName, c.lookupProperties,
c.log, c.metrics), nil
- case "http", "https":
- serviceNameResolver := NewPulsarServiceNameResolver(url)
- httpClient, err := NewHTTPClient(url, serviceNameResolver,
c.tlsConfig,
+ serviceNameResolver, err := NewPulsarServiceNameResolver(serviceURL)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create service name resolver
for URL '%s': %w", serviceURL, err)
+ }
Review Comment:
`newLookupService` parses `serviceURL` into a `PulsarServiceURI`, then
immediately constructs a `ServiceNameResolver` which parses the same string
again in `UpdateServiceURL`. This is unnecessary duplication on a hot path (and
slightly increases risk of parse behavior diverging). Consider constructing the
resolver first and using `serviceNameResolver.GetServiceURI().IsHTTP()` (or
similar) to decide between HTTP vs binary lookup services, removing the extra
parse.
##########
pulsar/internal/service_uri.go:
##########
@@ -55,153 +66,198 @@ func NewPulsarServiceURIFromURIString(uri string)
(*PulsarServiceURI, error) {
return u, nil
}
-func NewPulsarServiceURIFromURL(url *url.URL) (*PulsarServiceURI, error) {
- u, err := fromURL(url)
- if err != nil {
- log.Error(err)
- return nil, err
+func (p *PulsarServiceURI) UseTLS() bool {
+ return p.ServiceName == HTTPSService || slices.Contains(p.ServiceInfos,
SSLService)
+}
+
+func (p *PulsarServiceURI) PrimaryHostName() (string, error) {
+ if len(p.ServiceHosts) > 0 {
+ host, _, err := net.SplitHostPort(p.ServiceHosts[0])
+ if err != nil {
+ return "", err
+ }
+ return host, nil
}
- return u, nil
+
+ return "", errors.New("no hosts available in ServiceHosts")
}
-func fromString(uriStr string) (*PulsarServiceURI, error) {
- if uriStr == "" || len(uriStr) == 0 {
- return nil, errors.New("service uriStr string is null")
- }
- if strings.Contains(uriStr, "[") && strings.Contains(uriStr, "]") {
- // deal with ipv6 address
- hosts := strings.FieldsFunc(uriStr, splitURI)
- if len(hosts) > 1 {
- // deal with ipv6 address
- firstHost := hosts[0]
- lastHost := hosts[len(hosts)-1]
- hasPath := strings.Contains(lastHost, "/")
- path := ""
- if hasPath {
- idx := strings.Index(lastHost, "/")
- path = lastHost[idx:]
- }
- firstHost += path
- url, err := url.Parse(firstHost)
- if err != nil {
- return nil, err
- }
- serviceURI, err := fromURL(url)
- if err != nil {
- return nil, err
- }
- var mHosts []string
- var multiHosts []string
- mHosts = append(mHosts, serviceURI.ServiceHosts[0])
- mHosts = append(mHosts, hosts[1:]...)
-
- for _, v := range mHosts {
- h, err :=
validateHostName(serviceURI.ServiceName, serviceURI.ServiceInfos, v)
- if err == nil {
- multiHosts = append(multiHosts, h)
- } else {
- return nil, err
- }
- }
+func (p *PulsarServiceURI) IsHTTP() bool {
+ return p.ServiceName == HTTPService || p.ServiceName == HTTPSService
+}
- return &PulsarServiceURI{
- serviceURI.ServiceName,
- serviceURI.ServiceInfos,
- multiHosts,
- serviceURI.servicePath,
- serviceURI.URL,
- }, nil
- }
+func fromString(uriStr string) (*PulsarServiceURI, error) {
+ if uriStr == "" {
+ return nil, errors.New("service URI cannot be empty")
}
- url, err := url.Parse(uriStr)
+ // 1. Reduce a multi-host URI to one parseable host while preserving
suffixes.
+ singleHostURI, additionalHosts := splitHostURI(uriStr)
+
+ // 2. Parse single-host URI ONLY
+ u, err := url.Parse(singleHostURI)
if err != nil {
return nil, err
}
- return fromURL(url)
-}
+ if u.Host == "" {
+ return nil, errors.New("service host cannot be empty")
+ }
-func fromURL(url *url.URL) (*PulsarServiceURI, error) {
- if url == nil {
- return nil, errors.New("service url instance is null")
+ // 3. Parse scheme
+ scheme := strings.ToLower(u.Scheme)
+ if scheme == "" {
+ return nil, errors.New("service scheme cannot be empty")
}
- if url.Host == "" || len(url.Host) == 0 {
- return nil, errors.New("service host is null")
+ schemeParts := strings.Split(scheme, "+")
+ serviceName := schemeParts[0]
+ serviceInfos := schemeParts[1:]
+
+ // reject unknown scheme
+ switch serviceName {
+ case BinaryService, HTTPService, HTTPSService:
+ default:
+ return nil, &UnsupportedServiceNameError{ServiceName:
serviceName}
}
- var serviceName string
- var serviceInfos []string
- scheme := url.Scheme
- if scheme != "" {
- scheme = strings.ToLower(scheme)
- schemeParts := strings.Split(scheme, "+")
- serviceName = schemeParts[0]
- serviceInfos = schemeParts[1:]
+ // 4. Validate first host
+ firstHost, err := validateHostName(serviceName, serviceInfos, u.Host)
+ if err != nil {
+ return nil, err
}
- var serviceHosts []string
- hosts := strings.FieldsFunc(url.Host, splitURI)
- for _, v := range hosts {
- h, err := validateHostName(serviceName, serviceInfos, v)
- if err == nil {
- serviceHosts = append(serviceHosts, h)
- } else {
- return nil, err
+ serviceHosts := []string{firstHost}
+
+ // 5. Validate remaining hosts
+ if additionalHosts != "" {
+ for _, h := range strings.FieldsFunc(additionalHosts, splitURI)
{
+ host, err := validateHostName(serviceName,
serviceInfos, h)
+ if err != nil {
+ return nil, err
+ }
+ serviceHosts = append(serviceHosts, host)
}
}
return &PulsarServiceURI{
- serviceName,
- serviceInfos,
- serviceHosts,
- url.Path,
- url,
+ ServiceName: serviceName,
+ ServiceInfos: serviceInfos,
+ ServiceHosts: serviceHosts,
+ servicePath: u.Path,
+ URL: u,
}, nil
}
func splitURI(r rune) bool {
return r == ',' || r == ';'
}
+func splitHostURI(uriStr string) (string, string) {
+ authorityStart := strings.Index(uriStr, "//")
+ if authorityStart < 0 {
+ return uriStr, ""
+ }
+ authorityStart += 2
+
+ authorityEnd := len(uriStr)
+ if authoritySuffixEnd := strings.IndexAny(uriStr[authorityStart:],
"/?#"); authoritySuffixEnd >= 0 {
+ authorityEnd = authorityStart + authoritySuffixEnd
+ }
+
+ hostListStart := authorityStart
+ if userInfoEnd :=
strings.LastIndex(uriStr[authorityStart:authorityEnd], "@"); userInfoEnd >= 0 {
+ hostListStart += userInfoEnd + 1
+ }
+
+ if firstDelim := strings.IndexFunc(uriStr[hostListStart:authorityEnd],
splitURI); firstDelim >= 0 {
+ firstDelimIdx := hostListStart + firstDelim
+ return uriStr[:firstDelimIdx] + uriStr[authorityEnd:],
uriStr[firstDelimIdx+1 : authorityEnd]
+ }
+
+ return uriStr, ""
+}
+
func validateHostName(serviceName string, serviceInfos []string, hostname
string) (string, error) {
- uri, err := url.Parse("dummyscheme://" + hostname)
+ host, port, err := splitHostPortOrDefault(serviceName, serviceInfos,
hostname)
if err != nil {
return "", err
}
- host := uri.Hostname()
- if strings.Contains(hostname, "[") && strings.Contains(hostname, "]") {
- host = fmt.Sprintf("[%s]", host)
- }
- if host == "" || uri.Scheme == "" {
- return "", errors.New("Invalid hostname : " + hostname)
+
+ cleanHost, err := normalizeValidatedHost(hostname, host)
+ if err != nil {
+ return "", err
}
- port := uri.Port()
- if uri.Port() == "" {
- p := getServicePort(serviceName, serviceInfos)
- if p == -1 {
- return "", fmt.Errorf("invalid port : %d", p)
+ return net.JoinHostPort(cleanHost, port), nil
+}
+
+func splitHostPortOrDefault(serviceName string, serviceInfos []string,
hostname string) (string, string, error) {
+ // net.SplitHostPort enforces strict host:port syntax:
+ // - IPv4: "127.0.0.1:6650"
+ // - IPv6: "[fec0::1]:6650"
+ //
+ // It will fail for:
+ // - hosts without a port
+ // - bare IPv6 literals without brackets (e.g. "fec0::1")
+ host, port, err := net.SplitHostPort(hostname)
+ if err == nil {
+ if host == "" {
+ // net.SplitHostPort accepts ":port" with an empty
host, but we explicitly
+ // reject such inputs because a non-empty hostname is
required.
+ return "", "", fmt.Errorf("invalid address: host is
empty in %q", hostname)
}
- port = fmt.Sprint(p)
+ return host, port, nil
}
- result := host + ":" + port
- _, _, err = net.SplitHostPort(result)
- if err != nil {
- return "", err
+
+ // If the hostname contains ':' but is not bracketed, it is very likely
+ // an invalid IPv6 literal or an invalid host with too many colons.
+ //
+ // Examples rejected here:
+ // - "fec0::1"
+ // - "fec0::1:6650"
+ // - "localhost:6650:6651"
+ if strings.Contains(hostname, ":") && !strings.HasPrefix(hostname, "[")
{
+ return "", "", fmt.Errorf("invalid address (maybe missing
brackets for IPv6 or too many colons): %s", hostname)
+ }
+
+ defaultPort := getServicePort(serviceName, serviceInfos)
+ if defaultPort == -1 {
+ return "", "", fmt.Errorf("no port found")
+ }
+
+ return hostname, strconv.Itoa(defaultPort), nil
+}
+
+func normalizeValidatedHost(hostname, host string) (string, error) {
+ hasOpeningBracket := strings.HasPrefix(hostname, "[")
+ hasClosingBracket := strings.Contains(hostname, "]")
+ if hasOpeningBracket != hasClosingBracket {
+ return "", fmt.Errorf("invalid bracketed host: %s", hostname)
+ }
+
+ cleanHost := strings.TrimPrefix(strings.TrimSuffix(host, "]"), "[")
+ if !hasOpeningBracket {
+ return cleanHost, nil
+ }
+
+ addr, err := netip.ParseAddr(cleanHost)
+ if err != nil || !addr.Is6() {
+ return "", fmt.Errorf("invalid IPv6 address: %s", hostname)
}
- return result, nil
+
+ return cleanHost, nil
}
func getServicePort(serviceName string, serviceInfos []string) int {
- switch strings.ToLower(serviceName) {
+ switch serviceName {
case BinaryService:
- if len(serviceInfos) == 0 {
- return BinaryPort
- } else if len(serviceInfos) == 1 &&
strings.ToLower(serviceInfos[0]) == SSLService {
+ if slices.ContainsFunc(serviceInfos, func(s string) bool {
+ return strings.ToLower(s) == SSLService
+ }) {
return BinaryTLSPort
}
+ return BinaryPort
case HTTPService:
return HTTPPort
case HTTPSService:
Review Comment:
`getServicePort` now ignores unknown scheme modifiers for `pulsar` (it
returns `BinaryPort` unless it finds `ssl` anywhere). This changes behavior vs.
the previous logic which rejected unexpected modifiers (eg `pulsar+foo://...`)
by making port resolution fail, and can silently accept
typos/misconfigurations. It would be safer to validate `serviceInfos` (eg allow
only empty or exactly `ssl` for `pulsar`, and no modifiers for `http/https`)
and return an error for anything else.
```suggestion
// For Pulsar, only the "ssl" modifier is allowed. Any other
non-empty
// modifier is treated as invalid and causes port resolution to
fail.
if len(serviceInfos) == 0 {
return BinaryPort
}
hasSSL := false
for _, info := range serviceInfos {
if info == "" {
// Ignore empty modifiers if present.
continue
}
if strings.EqualFold(info, SSLService) {
hasSSL = true
continue
}
// Unknown modifier: reject to avoid silently accepting
typos.
return -1
}
if hasSSL {
return BinaryTLSPort
}
return BinaryPort
case HTTPService:
// HTTP should not have any scheme modifiers; reject if present.
if len(serviceInfos) != 0 {
return -1
}
return HTTPPort
case HTTPSService:
// HTTPS should not have any scheme modifiers; reject if
present.
if len(serviceInfos) != 0 {
return -1
}
```
##########
pulsar/internal/service_uri.go:
##########
@@ -55,153 +66,198 @@ func NewPulsarServiceURIFromURIString(uri string)
(*PulsarServiceURI, error) {
return u, nil
}
-func NewPulsarServiceURIFromURL(url *url.URL) (*PulsarServiceURI, error) {
- u, err := fromURL(url)
- if err != nil {
- log.Error(err)
- return nil, err
+func (p *PulsarServiceURI) UseTLS() bool {
+ return p.ServiceName == HTTPSService || slices.Contains(p.ServiceInfos,
SSLService)
+}
+
+func (p *PulsarServiceURI) PrimaryHostName() (string, error) {
+ if len(p.ServiceHosts) > 0 {
+ host, _, err := net.SplitHostPort(p.ServiceHosts[0])
+ if err != nil {
+ return "", err
+ }
+ return host, nil
}
- return u, nil
+
+ return "", errors.New("no hosts available in ServiceHosts")
}
-func fromString(uriStr string) (*PulsarServiceURI, error) {
- if uriStr == "" || len(uriStr) == 0 {
- return nil, errors.New("service uriStr string is null")
- }
- if strings.Contains(uriStr, "[") && strings.Contains(uriStr, "]") {
- // deal with ipv6 address
- hosts := strings.FieldsFunc(uriStr, splitURI)
- if len(hosts) > 1 {
- // deal with ipv6 address
- firstHost := hosts[0]
- lastHost := hosts[len(hosts)-1]
- hasPath := strings.Contains(lastHost, "/")
- path := ""
- if hasPath {
- idx := strings.Index(lastHost, "/")
- path = lastHost[idx:]
- }
- firstHost += path
- url, err := url.Parse(firstHost)
- if err != nil {
- return nil, err
- }
- serviceURI, err := fromURL(url)
- if err != nil {
- return nil, err
- }
- var mHosts []string
- var multiHosts []string
- mHosts = append(mHosts, serviceURI.ServiceHosts[0])
- mHosts = append(mHosts, hosts[1:]...)
-
- for _, v := range mHosts {
- h, err :=
validateHostName(serviceURI.ServiceName, serviceURI.ServiceInfos, v)
- if err == nil {
- multiHosts = append(multiHosts, h)
- } else {
- return nil, err
- }
- }
+func (p *PulsarServiceURI) IsHTTP() bool {
+ return p.ServiceName == HTTPService || p.ServiceName == HTTPSService
+}
- return &PulsarServiceURI{
- serviceURI.ServiceName,
- serviceURI.ServiceInfos,
- multiHosts,
- serviceURI.servicePath,
- serviceURI.URL,
- }, nil
- }
+func fromString(uriStr string) (*PulsarServiceURI, error) {
+ if uriStr == "" {
+ return nil, errors.New("service URI cannot be empty")
}
- url, err := url.Parse(uriStr)
+ // 1. Reduce a multi-host URI to one parseable host while preserving
suffixes.
+ singleHostURI, additionalHosts := splitHostURI(uriStr)
+
+ // 2. Parse single-host URI ONLY
+ u, err := url.Parse(singleHostURI)
if err != nil {
return nil, err
}
- return fromURL(url)
-}
+ if u.Host == "" {
+ return nil, errors.New("service host cannot be empty")
+ }
-func fromURL(url *url.URL) (*PulsarServiceURI, error) {
- if url == nil {
- return nil, errors.New("service url instance is null")
+ // 3. Parse scheme
+ scheme := strings.ToLower(u.Scheme)
+ if scheme == "" {
+ return nil, errors.New("service scheme cannot be empty")
}
- if url.Host == "" || len(url.Host) == 0 {
- return nil, errors.New("service host is null")
+ schemeParts := strings.Split(scheme, "+")
+ serviceName := schemeParts[0]
+ serviceInfos := schemeParts[1:]
+
+ // reject unknown scheme
+ switch serviceName {
+ case BinaryService, HTTPService, HTTPSService:
+ default:
+ return nil, &UnsupportedServiceNameError{ServiceName:
serviceName}
}
- var serviceName string
- var serviceInfos []string
- scheme := url.Scheme
- if scheme != "" {
- scheme = strings.ToLower(scheme)
- schemeParts := strings.Split(scheme, "+")
- serviceName = schemeParts[0]
- serviceInfos = schemeParts[1:]
+ // 4. Validate first host
+ firstHost, err := validateHostName(serviceName, serviceInfos, u.Host)
+ if err != nil {
+ return nil, err
}
- var serviceHosts []string
- hosts := strings.FieldsFunc(url.Host, splitURI)
- for _, v := range hosts {
- h, err := validateHostName(serviceName, serviceInfos, v)
- if err == nil {
- serviceHosts = append(serviceHosts, h)
- } else {
- return nil, err
+ serviceHosts := []string{firstHost}
+
+ // 5. Validate remaining hosts
+ if additionalHosts != "" {
+ for _, h := range strings.FieldsFunc(additionalHosts, splitURI)
{
+ host, err := validateHostName(serviceName,
serviceInfos, h)
+ if err != nil {
+ return nil, err
+ }
+ serviceHosts = append(serviceHosts, host)
}
}
return &PulsarServiceURI{
- serviceName,
- serviceInfos,
- serviceHosts,
- url.Path,
- url,
+ ServiceName: serviceName,
+ ServiceInfos: serviceInfos,
+ ServiceHosts: serviceHosts,
+ servicePath: u.Path,
+ URL: u,
}, nil
}
func splitURI(r rune) bool {
return r == ',' || r == ';'
}
+func splitHostURI(uriStr string) (string, string) {
+ authorityStart := strings.Index(uriStr, "//")
+ if authorityStart < 0 {
+ return uriStr, ""
+ }
+ authorityStart += 2
+
+ authorityEnd := len(uriStr)
+ if authoritySuffixEnd := strings.IndexAny(uriStr[authorityStart:],
"/?#"); authoritySuffixEnd >= 0 {
+ authorityEnd = authorityStart + authoritySuffixEnd
+ }
+
+ hostListStart := authorityStart
+ if userInfoEnd :=
strings.LastIndex(uriStr[authorityStart:authorityEnd], "@"); userInfoEnd >= 0 {
+ hostListStart += userInfoEnd + 1
+ }
+
+ if firstDelim := strings.IndexFunc(uriStr[hostListStart:authorityEnd],
splitURI); firstDelim >= 0 {
+ firstDelimIdx := hostListStart + firstDelim
+ return uriStr[:firstDelimIdx] + uriStr[authorityEnd:],
uriStr[firstDelimIdx+1 : authorityEnd]
+ }
+
+ return uriStr, ""
+}
+
func validateHostName(serviceName string, serviceInfos []string, hostname
string) (string, error) {
- uri, err := url.Parse("dummyscheme://" + hostname)
+ host, port, err := splitHostPortOrDefault(serviceName, serviceInfos,
hostname)
if err != nil {
return "", err
}
- host := uri.Hostname()
- if strings.Contains(hostname, "[") && strings.Contains(hostname, "]") {
- host = fmt.Sprintf("[%s]", host)
- }
- if host == "" || uri.Scheme == "" {
- return "", errors.New("Invalid hostname : " + hostname)
+
+ cleanHost, err := normalizeValidatedHost(hostname, host)
+ if err != nil {
+ return "", err
}
- port := uri.Port()
- if uri.Port() == "" {
- p := getServicePort(serviceName, serviceInfos)
- if p == -1 {
- return "", fmt.Errorf("invalid port : %d", p)
+ return net.JoinHostPort(cleanHost, port), nil
+}
+
+func splitHostPortOrDefault(serviceName string, serviceInfos []string,
hostname string) (string, string, error) {
+ // net.SplitHostPort enforces strict host:port syntax:
+ // - IPv4: "127.0.0.1:6650"
+ // - IPv6: "[fec0::1]:6650"
+ //
+ // It will fail for:
+ // - hosts without a port
+ // - bare IPv6 literals without brackets (e.g. "fec0::1")
+ host, port, err := net.SplitHostPort(hostname)
+ if err == nil {
+ if host == "" {
+ // net.SplitHostPort accepts ":port" with an empty
host, but we explicitly
+ // reject such inputs because a non-empty hostname is
required.
+ return "", "", fmt.Errorf("invalid address: host is
empty in %q", hostname)
}
- port = fmt.Sprint(p)
+ return host, port, nil
}
- result := host + ":" + port
- _, _, err = net.SplitHostPort(result)
- if err != nil {
- return "", err
+
+ // If the hostname contains ':' but is not bracketed, it is very likely
+ // an invalid IPv6 literal or an invalid host with too many colons.
+ //
+ // Examples rejected here:
+ // - "fec0::1"
+ // - "fec0::1:6650"
+ // - "localhost:6650:6651"
+ if strings.Contains(hostname, ":") && !strings.HasPrefix(hostname, "[")
{
+ return "", "", fmt.Errorf("invalid address (maybe missing
brackets for IPv6 or too many colons): %s", hostname)
Review Comment:
`splitHostPortOrDefault` treats any hostname containing ':' that isn't
bracket-prefixed as an IPv6/too-many-colons case, which also catches
single-colon cases like `localhost:xyz` / `localhost:-1` / `localhost:`. That
means invalid-port errors will be reported with the misleading 'maybe missing
brackets for IPv6' message. Consider only triggering this branch when there are
multiple colons (e.g., `strings.Count(hostname, ":") > 1`), and otherwise
returning/augmenting the original `net.SplitHostPort` error so invalid ports
are reported correctly.
```suggestion
// If the hostname contains ':' but is not bracketed and has multiple
colons,
// it is very likely an invalid IPv6 literal or an invalid host with
too many colons.
//
// Examples rejected here:
// - "fec0::1"
// - "fec0::1:6650"
// - "localhost:6650:6651"
if strings.Contains(hostname, ":") {
if !strings.HasPrefix(hostname, "[") && strings.Count(hostname,
":") > 1 {
return "", "", fmt.Errorf("invalid address (maybe
missing brackets for IPv6 or too many colons): %s", hostname)
}
// For single-colon host:port inputs or bracketed addresses,
return the
// underlying SplitHostPort error so invalid ports and syntax
are reported
// accurately.
return "", "", fmt.Errorf("invalid address %q: %w", hostname,
err)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]