[GitHub] yush1ga commented on issue #820: Added setting for anonymous user role
yush1ga commented on issue #820: Added setting for anonymous user role URL: https://github.com/apache/incubator-pulsar/pull/820#issuecomment-357154798 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yush1ga commented on issue #820: Added setting for anonymous user role
yush1ga commented on issue #820: Added setting for anonymous user role URL: https://github.com/apache/incubator-pulsar/pull/820#issuecomment-357154798 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yush1ga commented on issue #899: Add subscription auth mode by prefix
yush1ga commented on issue #899: Add subscription auth mode by prefix URL: https://github.com/apache/incubator-pulsar/pull/899#issuecomment-357152274 @saandrews @merlimat Thank you for comments. I fixed things mentioned above. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yush1ga commented on issue #899: Add subscription auth mode by prefix
yush1ga commented on issue #899: Add subscription auth mode by prefix URL: https://github.com/apache/incubator-pulsar/pull/899#issuecomment-357144485 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yush1ga commented on issue #899: Add subscription auth mode by prefix
yush1ga commented on issue #899: Add subscription auth mode by prefix URL: https://github.com/apache/incubator-pulsar/pull/899#issuecomment-357144485 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack opened a new pull request #1059: Issue 1014: Rename "global zookeeper" to "configuration-store"(change in code, conf and cli)
zhaijack opened a new pull request #1059: Issue 1014: Rename "global zookeeper" to "configuration-store"(change in code, conf and cli) URL: https://github.com/apache/incubator-pulsar/pull/1059 ### Motivation We learned from our customers that 'global-zookeeper' is a bit confusing. people feels that the global zookeeper is the single point of failure, when it is down, pulsar is not functionable. since it is mostly used for configuration/policies, it is better to rename it to make the meaning more clear, so people will know directly that it is used for storing configurations/policies and they will not be freak out, or having other bad impressions. It would be good to rename "global zookeeper" to "configuration store". ### Modifications - rename "global zookeeper" to "configuration store" in code and cli command. - mark old "global zookeeper" as "Deprecated". - Add new "configuration store" to replace "global zookeeper". ### Result Since old "global zookeeper" is marked as deprecated, so it still OK to run as old way. Maybe finally removed in next release. Encourage user to use new "configuration store". This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jai1 commented on a change in pull request #1002: Making Pulsar Proxy more secure
jai1 commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r161127527 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java ## @@ -98,7 +98,7 @@ private String clientVersion = null; private int nonPersistentPendingMessages = 0; private final int MaxNonPersistentPendingMessages; -private String originalPrincipal; +private String proxyClientAuthRole = null; Review comment: Three reasons:- - Principal is specific to athens - we traditionally used auth*Role* to store the auth data - proxyClient is more specific than original This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1038: ENH: Add C++ code auto-formatting option
rdhabalia commented on a change in pull request #1038: ENH: Add C++ code auto-formatting option URL: https://github.com/apache/incubator-pulsar/pull/1038#discussion_r161117396 ## File path: pulsar-client-cpp/build-support/run_clang_format.py ## @@ -0,0 +1,80 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Original: https://github.com/apache/arrow/blob/4dbce607d50031a405af39d36e08cd03c5ffc764/cpp/build-support/run_clang_format.py +# ChangeLog: +# 2018-01-08: Accept multiple source directories (@Licht-T) Review comment: I think we can track it with commits so, should we remove the changelog? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Licht-T commented on issue #1038: ENH: Add C++ code auto-formatting option
Licht-T commented on issue #1038: ENH: Add C++ code auto-formatting option URL: https://github.com/apache/incubator-pulsar/pull/1038#issuecomment-357104740 @maskit Okay! I'll create a new PR! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Licht-T commented on issue #1038: ENH: Add C++ code auto-formatting option
Licht-T commented on issue #1038: ENH: Add C++ code auto-formatting option URL: https://github.com/apache/incubator-pulsar/pull/1038#issuecomment-357104427 @merlimat Seems that `clang-format-5.0` is installed by Homebrew. You can try this by changing the version setting to 5.0 or downloading the 4.0 pre-build binary. https://github.com/apache/incubator-pulsar/pull/1038/files#diff-bda8db832e446d092d38464123ed17bcR201 http://releases.llvm.org/download.html This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on issue #1054: Populate message properties after building a message
rdhabalia commented on issue #1054: Populate message properties after building a message URL: https://github.com/apache/incubator-pulsar/pull/1054#issuecomment-357085801 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on issue #1054: Populate message properties after building a message
rdhabalia commented on issue #1054: Populate message properties after building a message URL: https://github.com/apache/incubator-pulsar/pull/1054#issuecomment-357085801 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r161097024 ## File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java ## @@ -70,6 +70,19 @@ public boolean canProduce(DestinationName destination, String role) throws Excep } } +/** + * Check if the specified role has permission to access the destination via a proxy + * + * @param destination + *the fully qualified destination name associated with the destination. + * @param role + *the app id used to receive messages from the destination. + */ +public CompletableFuture canProxyAsync(DestinationName destination, String role) { +return checkAuthorization(destination, role, AuthAction.proxy); Review comment: instead `AuthAction`, as @merlimat mentioned earlier, should we pass `proxy` as a flag to authorize request? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r161094006 ## File path: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java ## @@ -35,6 +35,14 @@ // ZooKeeper session timeout private int zookeeperSessionTimeoutMs = 30_000; +// If Discovery Service is Disabled the proxy will just authenticate the client +// and forward all requests to a VIP or any other service discovery port +private boolean discoveryServiceEnabled = true; Review comment: I think we might not need this flag? If global-zk address is not provided then proxy should fall to provided discovery-url? similar as what we have done for websocket `WebSocketProxyConfiguration.java`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r161093519 ## File path: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupViaServiceUrl.java ## @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.proxy.server; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.URI; +import java.net.URISyntaxException; + +import org.apache.pulsar.common.api.Commands; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata; +import org.apache.pulsar.common.api.proto.PulsarApi.ServerError; +import org.apache.pulsar.common.naming.DestinationName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.prometheus.client.Counter; + +public class LookupViaServiceUrl implements LookupProxyHandler { Review comment: Actually `LookupViaServiceUrl` and `LookupWithDiscoveryServiceHandler` have most of the logic in common and only difference it requires is "service-url". Right now, discovery service provides next-broker url and we want to add lookup using broker-service url also. So, I think we don't need different implementation of `LookupProxyHandler` but we need provider that returns broker-url based on discovery-service/configured service-url and things will remain same in this `LookupProxyHandler`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r161096620 ## File path: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java ## @@ -170,7 +170,11 @@ protected void handleConnect(CommandConnect connect) { // Client is doing a lookup, we can consider the handshake complete and we'll take care of just topics and // partitions metadata lookups state = State.ProxyLookupRequests; -lookupProxyHandler = new LookupProxyHandler(service, this); +if (service.getConfiguration().isDiscoveryServiceEnabled()) { Review comment: is mentioned above, should we derive it based on global-zk value present or not? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r161081263 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java ## @@ -174,77 +174,115 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E @Override protected void handleLookup(CommandLookupTopic lookup) { final long requestId = lookup.getRequestId(); -final String topic = lookup.getTopic(); +final String topicName = lookup.getTopic(); if (log.isDebugEnabled()) { -log.debug("[{}] Received Lookup from {} for {}", topic, remoteAddress, requestId); -} -final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); -if (lookupSemaphore.tryAcquire()) { -lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(), -getRole(), lookup.getRequestId()).handle((lookupResponse, ex) -> { -if (ex == null) { -ctx.writeAndFlush(lookupResponse); -} else { -// it should never happen -log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex); -ctx.writeAndFlush( - newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId)); -} -lookupSemaphore.release(); -return null; -}); -} else { -if (log.isDebugEnabled()) { -log.debug("[{}] Failed lookup due to too many lookup-requests {}", remoteAddress, topic); -} - ctx.writeAndFlush(newLookupErrorResponse(ServerError.TooManyRequests, -"Failed due to too many pending lookup requests", requestId)); +log.debug("[{}] Received Lookup from {} for {}", topicName, remoteAddress, requestId); } - +final String proxyClientAuthRole = lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal() : this.proxyClientAuthRole; Review comment: can't we use `OriginalPrincipal` that broker received on `handleConnect()`? is this different auth-role? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r161078193 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java ## @@ -98,7 +98,7 @@ private String clientVersion = null; private int nonPersistentPendingMessages = 0; private final int MaxNonPersistentPendingMessages; -private String originalPrincipal; +private String proxyClientAuthRole = null; Review comment: Is there any reason for renaming `originalPrincipal`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r161096455 ## File path: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java ## @@ -35,6 +35,14 @@ // ZooKeeper session timeout private int zookeeperSessionTimeoutMs = 30_000; +// If Discovery Service is Disabled the proxy will just authenticate the client +// and forward all requests to a VIP or any other service discovery port +private boolean discoveryServiceEnabled = true; + +// if Service Discovery is Disabled this url should point to the discovery service provider. +private String discoveryServiceURL = "pulsar://localhost:6650/"; Review comment: should we rename it to `brokerServiceUrl` as in this case proxy will directly try to communicate broker with brokerserviceUrl? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r161081553 ## File path: pulsar-common/src/main/proto/PulsarApi.proto ## @@ -186,6 +187,7 @@ message CommandSubscribe { message CommandPartitionedTopicMetadata { required string topic= 1; required uint64 request_id = 2; +optional string original_principal = 3; Review comment: formatting.. and same here, can't we use `original_principal` received from `CommandConnect`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r161093744 ## File path: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java ## @@ -35,6 +35,14 @@ // ZooKeeper session timeout private int zookeeperSessionTimeoutMs = 30_000; +// If Discovery Service is Disabled the proxy will just authenticate the client +// and forward all requests to a VIP or any other service discovery port +private boolean discoveryServiceEnabled = true; + +// if Service Discovery is Disabled this url should point to the discovery service provider. +private String discoveryServiceURL = "pulsar://localhost:6650/"; Review comment: should we avoid adding default value as we want to add validation when proxy-server starts? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure
rdhabalia commented on a change in pull request #1002: Making Pulsar Proxy more secure URL: https://github.com/apache/incubator-pulsar/pull/1002#discussion_r161078797 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java ## @@ -174,77 +174,115 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E @Override protected void handleLookup(CommandLookupTopic lookup) { final long requestId = lookup.getRequestId(); -final String topic = lookup.getTopic(); +final String topicName = lookup.getTopic(); if (log.isDebugEnabled()) { -log.debug("[{}] Received Lookup from {} for {}", topic, remoteAddress, requestId); -} -final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); -if (lookupSemaphore.tryAcquire()) { -lookupDestinationAsync(getBrokerService().pulsar(), DestinationName.get(topic), lookup.getAuthoritative(), -getRole(), lookup.getRequestId()).handle((lookupResponse, ex) -> { -if (ex == null) { -ctx.writeAndFlush(lookupResponse); -} else { -// it should never happen -log.warn("[{}] lookup failed with error {}, {}", remoteAddress, topic, ex.getMessage(), ex); -ctx.writeAndFlush( - newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId)); -} -lookupSemaphore.release(); -return null; -}); -} else { -if (log.isDebugEnabled()) { -log.debug("[{}] Failed lookup due to too many lookup-requests {}", remoteAddress, topic); -} - ctx.writeAndFlush(newLookupErrorResponse(ServerError.TooManyRequests, -"Failed due to too many pending lookup requests", requestId)); +log.debug("[{}] Received Lookup from {} for {}", topicName, remoteAddress, requestId); } - +final String proxyClientAuthRole = lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal() : this.proxyClientAuthRole; +CompletableFuture isProxyAuthorizedFuture = isProxyAuthorized(topicName, proxyClientAuthRole); + +isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> { +if (isProxyAuthorized) { +final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); Review comment: I think we should apply throttling before doing any action. So, can we move it to the beginning as before? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on issue #1058: Provide a mechanism to tell whether a consumer is the leader of a failover subscription
jerrypeng commented on issue #1058: Provide a mechanism to tell whether a consumer is the leader of a failover subscription URL: https://github.com/apache/incubator-pulsar/issues/1058#issuecomment-357082837 +1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie opened a new issue #1058: Provide a mechanism to tell whether a consumer is the leader of a failover subscription
sijie opened a new issue #1058: Provide a mechanism to tell whether a consumer is the leader of a failover subscription URL: https://github.com/apache/incubator-pulsar/issues/1058 `Failover` subscription is sort of providing a `leader-election` mechanism over multiple consumers within a failover subscription. It would be good to provide a mechanism to expose state changes (e.g. become leader, become standay) to the consumer, so applications can use this information to determine if a consumer is the leader consumer or not. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1054: Populate message properties after building a message
rdhabalia commented on a change in pull request #1054: Populate message properties after building a message URL: https://github.com/apache/incubator-pulsar/pull/1054#discussion_r161062602 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java ## @@ -75,12 +81,8 @@ static MessageImpl create(MessageMetadata.Builder msgMetadataBuilder, ByteBuffer this.payload = Unpooled.copiedBuffer(payload); if (msgMetadata.getPropertiesCount() > 0) { -Map properties = Maps.newTreeMap(); -for (KeyValue entry : msgMetadata.getPropertiesList()) { -properties.put(entry.getKey(), entry.getValue()); -} - -this.properties = Collections.unmodifiableMap(properties); +this.properties = Collections.unmodifiableMap(msgMetadataBuilder.getPropertiesList().stream() Review comment: > I think this might also affect the replicator messages, , Maybe we could set properties to null and do a lazy population Make sense. fixed it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sschepens commented on issue #1056: DoubleByteBuf fix for Netty > 4.1.12
sschepens commented on issue #1056: DoubleByteBuf fix for Netty > 4.1.12 URL: https://github.com/apache/incubator-pulsar/pull/1056#issuecomment-357036886 @merlimat When flushing writing multiple messages they are sort of queued inside a `ChannelOutboundBuffer` which is a sort of linked list, this could cause allocations as well, I really don't know which is better. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sschepens commented on issue #1056: DoubleByteBuf fix for Netty > 4.1.12
sschepens commented on issue #1056: DoubleByteBuf fix for Netty > 4.1.12 URL: https://github.com/apache/incubator-pulsar/pull/1056#issuecomment-357043076 I think it's more troublesome to have it as a holder, we would have to change a lot of files with probably little benefit. The changes I made here should be the same or slightly better than previously This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sschepens commented on issue #1056: DoubleByteBuf fix for Netty > 4.1.12
sschepens commented on issue #1056: DoubleByteBuf fix for Netty > 4.1.12 URL: https://github.com/apache/incubator-pulsar/pull/1056#issuecomment-357036886 @merlimat When flushing writing multiple messages they are sort of queued inside a `ChannelOutboundBuffer` which is a sort of linked list, this could cause [allocations](https://github.com/netty/netty/blob/4.1/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java#L113) as well, I really don't know which is better. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sschepens commented on issue #1056: DoubleByteBuf fix for Netty > 4.1.12
sschepens commented on issue #1056: DoubleByteBuf fix for Netty > 4.1.12 URL: https://github.com/apache/incubator-pulsar/pull/1056#issuecomment-357036886 @merlimat When flushing writing multiple messages they are sort of queued inside a `ChannelOutboundBuffer` which is a sort of linked list, this causes [allocations](https://github.com/netty/netty/blob/4.1/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java#L113) as well, I really don't know which is better. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sschepens commented on issue #1056: DoubleByteBuf fix for Netty > 4.1.12
sschepens commented on issue #1056: DoubleByteBuf fix for Netty > 4.1.12 URL: https://github.com/apache/incubator-pulsar/pull/1056#issuecomment-357036886 @merlimat When flushing writing multiple messages they are sort of queued inside a `ChannelOutboundBuffer` which is a sort of linked list, this causes [allocations](https://github.com/netty/netty/blob/4.1/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java#L113) as well, I really don't know which is better. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng commented on issue #1057: fixing guava imports that mistakenly use jersey.repackaged.*
jerrypeng commented on issue #1057: fixing guava imports that mistakenly use jersey.repackaged.* URL: https://github.com/apache/incubator-pulsar/pull/1057#issuecomment-357034311 @merlimat @sijie can you guys review. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jerrypeng opened a new pull request #1057: fixing guava imports that mistakenly use jersey.repackaged.*
jerrypeng opened a new pull request #1057: fixing guava imports that mistakenly use jersey.repackaged.* URL: https://github.com/apache/incubator-pulsar/pull/1057 ### Motivation Some classes unnecessarily use the guava classes from jersey.repackaged. ### Modifications Changing them to just use Guava package This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] lucperkins commented on issue #1047: Add StatefulSet option for BookKeeper on Google Kubernetes Engine (WIP)
lucperkins commented on issue #1047: Add StatefulSet option for BookKeeper on Google Kubernetes Engine (WIP) URL: https://github.com/apache/incubator-pulsar/pull/1047#issuecomment-357032132 @merlimat Okay, I've made the updates you requested. At the moment I'm still having trouble connecting to the cluster using external clients (the internal client from the `pulsar-admin` pod works fine). I'll take another look tomorrow. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #1056: DoubleByteBuf fix for Netty > 4.1.12
merlimat commented on issue #1056: DoubleByteBuf fix for Netty > 4.1.12 URL: https://github.com/apache/incubator-pulsar/pull/1056#issuecomment-357031277 @sschepens Thanks for investigating this and finding the root cause! One other option to fix this was to get rid of the `DoubleByteBuf` alltogether. Maybe just keep it as a "holder" of the 2 buffers, but writing directly the 2 buffers to the channel rather than writing the `DoubleByteBuf`. This should be more efficient since we don't have to create the array of `ByteBuffers`. What do you think? Would you want do that? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] saandrews commented on a change in pull request #899: Add subscription auth mode by prefix
saandrews commented on a change in pull request #899: Add subscription auth mode by prefix URL: https://github.com/apache/incubator-pulsar/pull/899#discussion_r161021482 ## File path: pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationManager.java ## @@ -78,14 +80,52 @@ public boolean canProduce(DestinationName destination, String role) throws Excep *the fully qualified destination name associated with the destination. * @param role *the app id used to receive messages from the destination. + * @param subscription + *the subscription name defined by the client */ -public CompletableFuture canConsumeAsync(DestinationName destination, String role) { -return checkAuthorization(destination, role, AuthAction.consume); +public CompletableFuture canConsumeAsync(DestinationName destination, String role, String subscription) { +CompletableFuture permissionFuture = new CompletableFuture<>(); +try { +configCache.policiesCache().getAsync(POLICY_ROOT + destination.getNamespace()).thenAccept(policies -> { +if (!policies.isPresent()) { +if (log.isDebugEnabled()) { +log.debug("Policies node couldn't be found for destination : {}", destination); +} +} else { +if (isNotBlank(subscription)) { +switch (policies.get().subscription_auth_mode) { +case Prefix: +if (!subscription.startsWith(role)) { +PulsarServerException ex = new PulsarServerException( +String.format("Failed to create consumer - The subscription name needs to be prefixed by the authentication role, like %s-", role)); Review comment: Can you add the destination info in the message? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sschepens commented on issue #1056: DoubleByteBuf fix for Netty > 4.1.12
sschepens commented on issue #1056: DoubleByteBuf fix for Netty > 4.1.12 URL: https://github.com/apache/incubator-pulsar/pull/1056#issuecomment-356990524 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sschepens commented on issue #1056: DoubleByteBuf fix for Netty > 4.1.12
sschepens commented on issue #1056: DoubleByteBuf fix for Netty > 4.1.12 URL: https://github.com/apache/incubator-pulsar/pull/1056#issuecomment-356990524 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] massakam commented on issue #1055: Fix format of Athenz auth parameters written in document
massakam commented on issue #1055: Fix format of Athenz auth parameters written in document URL: https://github.com/apache/incubator-pulsar/pull/1055#issuecomment-356965385 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sschepens opened a new pull request #1056: DoubleByteBuf fix for Netty > 4.1.12
sschepens opened a new pull request #1056: DoubleByteBuf fix for Netty > 4.1.12 URL: https://github.com/apache/incubator-pulsar/pull/1056 It seems `DoubleByteBuf` is not playing well when used with Native Transports. Using @merlimat [test](https://gist.github.com/merlimat/799b8ba84b5987de369668a748aa0e32) I was able to reproduce this. Switching from Epoll to Nio transport fixes the issue, so this has to be something with native transports. Also, changing [this line](https://github.com/apache/incubator-pulsar/blob/master/pulsar-common/src/main/java/org/apache/pulsar/common/api/DoubleByteBuf.java#L386) to `allocateDirect` fixes the issue for both Epoll and Nio, but I don't know if it's the right thing to do. The most simple fix is not try to allocate a new `ByteBuffer` but return a concatenation of the components' nio buffers. Therefore, I propose making `DoubleByteBuf.nioBuffers` return a concatenation of the two components' `nioBuffers` when index and length are not needed, this could prove be more efficient. Also make `nioBuffer` allocate a direct buffer if both components are direct, i believe this should be safe also. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1054: Populate message properties after building a message
merlimat commented on a change in pull request #1054: Populate message properties after building a message URL: https://github.com/apache/incubator-pulsar/pull/1054#discussion_r160967552 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java ## @@ -75,12 +81,8 @@ static MessageImpl create(MessageMetadata.Builder msgMetadataBuilder, ByteBuffer this.payload = Unpooled.copiedBuffer(payload); if (msgMetadata.getPropertiesCount() > 0) { -Map properties = Maps.newTreeMap(); -for (KeyValue entry : msgMetadata.getPropertiesList()) { -properties.put(entry.getKey(), entry.getValue()); -} - -this.properties = Collections.unmodifiableMap(properties); +this.properties = Collections.unmodifiableMap(msgMetadataBuilder.getPropertiesList().stream() Review comment: Maybe we could set properties to null and do a lazy population This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] massakam commented on issue #1055: Fix format of Athenz auth parameters written in document
massakam commented on issue #1055: Fix format of Athenz auth parameters written in document URL: https://github.com/apache/incubator-pulsar/pull/1055#issuecomment-356930632 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] massakam opened a new pull request #1055: Fix format of Athenz auth parameters written in document
massakam opened a new pull request #1055: Fix format of Athenz auth parameters written in document URL: https://github.com/apache/incubator-pulsar/pull/1055 ### Motivation A format of Athenz auth parameters written in document is old. ### Modifications * Change the format to JSON (#793) * Use `privateKey` parameter instead of `privateKeyPath` (#672) ### Result Users will be able to set the auth parameters of Athenz correctly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on a change in pull request #1044: Compact algo
ivankelly commented on a change in pull request #1044: Compact algo URL: https://github.com/apache/incubator-pulsar/pull/1044#discussion_r160890288 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/compaction/Compactor.java ## @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import com.google.common.collect.ImmutableMap; +import io.netty.buffer.ByteBuf; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.TimeUnit; + +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerHandle; + +import org.apache.pulsar.common.api.Commands; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.RawReader; +import org.apache.pulsar.client.api.RawMessage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Compactor for Pulsar topics + * + * Compaction will go through the topic in two passes. The first pass + * selects latest offset for each key in the topic. Then the second pass + * writes these values to a ledger. + * + * The two passes are required to avoid holding the payloads of each of + * the latest values in memory, as the payload can be many orders of + * magnitude larger than a message id. +*/ +public class Compactor { +private static final Logger log = LoggerFactory.getLogger(Compactor.class); +private static final String COMPACTION_SUBSCRIPTION = "__compaction"; +private static final int MAX_OUTSTANDING_READS = 500; +private static final int COMPACTED_TOPIC_ENSEMBLE_SIZE = 3; +private static final int COMPACTED_TOPIC_ACK_QUORUM_SIZE = 3; +private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger"; +static byte[] COMPACTED_TOPIC_LEDGER_PASSWORD = COMPACTION_SUBSCRIPTION.getBytes(); + +private final ExecutorService executor; +private final PulsarClient pulsar; +private final BookKeeper bk; + +public Compactor(PulsarClient pulsar, + BookKeeper bk, + ExecutorService executor) { +this.executor = executor; +this.pulsar = pulsar; +this.bk = bk; +} + +public CompletableFuture compact(String topic) { +final CompletableFuture future += new CompletableFuture<>(); +executor.submit(new CompactionTask(topic, future)); +return future; +} + +private class CompactionTask implements Runnable { +final String topic; +final CompletableFuture future; + +CompactionTask(String topic, CompletableFuture future) { +this.topic = topic; +this.future = future; +} + +@Override +public void run() { +RawReader reader = null; +try { +reader = RawReader.create(pulsar, topic, + COMPACTION_SUBSCRIPTION).get(10, TimeUnit.SECONDS); +Map latestForKey = new HashMap<>(); + +RawMessage firstMessage = reader.readNextAsync().get(10, TimeUnit.SECONDS); +MessageId firstMessageId = firstMessage.getMessageId(); +latestForKey.put(extractKey(firstMessage), firstMessageId); +MessageId latestMessageId = firstMessageId; + +BlockingQueue> futures = new ArrayBlockingQueue>( +MAX_OUTSTANDING_READS); +try { +while (true) { // breaks with exception when there's no more messages +Future f = reader.readNextAsync(); +if (!futures.offer(f)) { +RawMessage