[GitHub] rdhabalia commented on a change in pull request #1208: Add hostname-verification at client tls connection
rdhabalia commented on a change in pull request #1208: Add hostname-verification at client tls connection URL: https://github.com/apache/incubator-pulsar/pull/1208#discussion_r167159624 ## File path: pulsar-broker/src/test/resources/authentication/tls/hn-verification/broker-cert.pem ## @@ -0,0 +1,82 @@ +Certificate: Review comment: I checked the rat-check plugin in [pom](https://github.com/apache/incubator-pulsar/blob/master/pom.xml#L760) which ignores all *.cert/*.key file but doesn't exclude `*.pem` so, I will add it too. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1208: Add hostname-verification at client tls connection
rdhabalia commented on a change in pull request #1208: Add hostname-verification at client tls connection URL: https://github.com/apache/incubator-pulsar/pull/1208#discussion_r167158945 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java ## @@ -356,4 +357,21 @@ public void setMaxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRe this.maxNumberOfRejectedRequestPerConnection = maxNumberOfRejectedRequestPerConnection; } +public boolean isTlsHostnameVerificationEnable() { +return tlsHostnameVerificationEnable; +} + +/** + * It allows to validate hostname verification when client connects to broker over tls. It validates incoming x509 + * certificate and matches provided hostname(CN/SAN) with expected broker's host name. It follows RFC 2818, 3.1. Server + * Identity hostname verification. + * + * @see https://tools.ietf.org/html/rfc2818";>rfc2818 + * + * @param tlsHostnameVerificationEnable + */ +public void setTlsHostnameVerificationEnable(boolean tlsHostnameVerificationEnable) { Review comment: umm.. actually one can use `allowInsecureConnection` in non-prod env which makes client to trust all X.509 certificates without any verification using `InsecureTrustManagerFactory.java`. However, hostname verification can be applied on top of secured connection as well. > that is what the HTTP client is following anyway. Actually even HTTP client also provides separate API to [set hostNameVerifier](http://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/org/apache/http/impl/client/HttpClientBuilder.html#setHostnameVerifier(org.apache.http.conn.ssl.X509HostnameVerifier)) So, as both the configs serve different purpose then shouldn't it better to give flexibility while configuring it? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1208: Add hostname-verification at client tls connection
rdhabalia commented on a change in pull request #1208: Add hostname-verification at client tls connection URL: https://github.com/apache/incubator-pulsar/pull/1208#discussion_r167165234 ## File path: pulsar-common/src/main/java/org/apache/pulsar/common/util/ssl/DefaultHostnameVerifier.java ## @@ -0,0 +1,297 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util.ssl; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.security.cert.Certificate; +import java.security.cert.CertificateParsingException; +import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.NoSuchElementException; + +import javax.naming.InvalidNameException; +import javax.naming.NamingException; +import javax.naming.directory.Attribute; +import javax.naming.directory.Attributes; +import javax.naming.ldap.LdapName; +import javax.naming.ldap.Rdn; +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLSession; +import javax.security.auth.x500.X500Principal; + +import org.apache.pulsar.common.util.ssl.PublicSuffixMatcher.DomainType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Default {@link javax.net.ssl.HostnameVerifier} implementation. Copied from httpclient. Review comment: i have included that as a dep by excluding all transitive dep from it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch asf-site updated: Updated site at revision feb0acc
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/asf-site by this push: new 3f2c1f2 Updated site at revision feb0acc 3f2c1f2 is described below commit 3f2c1f29debfc3baf5e75bbab37a466c4315469f Author: jenkins AuthorDate: Fri Feb 9 08:46:36 2018 + Updated site at revision feb0acc --- content/docs/latest/project/BinaryProtocol/index.html | 7 +++ content/ja/project/BinaryProtocol/index.html | 7 +++ 2 files changed, 14 insertions(+) diff --git a/content/docs/latest/project/BinaryProtocol/index.html b/content/docs/latest/project/BinaryProtocol/index.html index 50fa4fd..29b81ec 100644 --- a/content/docs/latest/project/BinaryProtocol/index.html +++ b/content/docs/latest/project/BinaryProtocol/index.html @@ -5438,6 +5438,13 @@ incrementally added to the protocol Added proxy to broker + + + v11 + 11 + C++ consumers before this version are not correctly handling the checksum field + + diff --git a/content/ja/project/BinaryProtocol/index.html b/content/ja/project/BinaryProtocol/index.html index bb85768..70c5c9d 100644 --- a/content/ja/project/BinaryProtocol/index.html +++ b/content/ja/project/BinaryProtocol/index.html @@ -5223,6 +5223,13 @@ incrementally added to the protocol Added proxy to broker + + + v11 + 11 + C++ consumers before this version are not correctly handling the checksum field + + -- To stop receiving notification emails like this one, please contact mme...@apache.org.
[GitHub] ivankelly commented on a change in pull request #1205: Algorithm to find start point of compacted ledger
ivankelly commented on a change in pull request #1205: Algorithm to find start point of compacted ledger URL: https://github.com/apache/incubator-pulsar/pull/1205#discussion_r167196940 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java ## @@ -18,10 +18,111 @@ */ package org.apache.pulsar.compaction; +import com.google.common.cache.Cache; +import com.google.common.collect.ComparisonChain; + +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair; +import org.apache.pulsar.client.api.RawMessage; +import org.apache.pulsar.client.impl.RawMessageImpl; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class CompactedTopicImpl implements CompactedTopic { +final static long NEWER_THAN_COMPACTED = -0xfeed0fbaL; + @Override public void newCompactedLedger(Position p, long compactedLedgerId) {} + +static CompletableFuture findStartPoint(LedgerHandle lh, PositionImpl p, + Cache cache) { +CompletableFuture promise = new CompletableFuture<>(); +findStartPointLoop(lh, p, 0, lh.getLastAddConfirmed(), promise, cache); +return promise; +} + +private static void findStartPointLoop(LedgerHandle lh, PositionImpl p, long start, long end, + CompletableFuture promise, + Cache cache) { +long midpoint = start + ((end - start) / 2); + +CompletableFuture startEntry = readOneMessageId(lh, start, cache); +CompletableFuture middleEntry = readOneMessageId(lh, midpoint, cache); +CompletableFuture endEntry = readOneMessageId(lh, end, cache); + +CompletableFuture.allOf(startEntry, middleEntry, endEntry).whenComplete( +(v, exception) -> { +if (exception != null) { +promise.completeExceptionally(exception); +} +try { +if (comparePositionAndMessageId(p, startEntry.get()) < 0) { +promise.complete(start); +} else if (comparePositionAndMessageId(p, middleEntry.get()) < 0) { +findStartPointLoop(lh, p, start, midpoint, promise, cache); +} else if (comparePositionAndMessageId(p, endEntry.get()) < 0) { +findStartPointLoop(lh, p, midpoint + 1, end, promise, cache); +} else { +promise.complete(NEWER_THAN_COMPACTED); +} +} catch (InterruptedException ie) { +// should never happen as all should have been completed +Thread.currentThread().interrupt(); +log.error("Interrupted waiting on futures which should have completed", ie); +} catch (ExecutionException e) { +// shouldn't happen, allOf should have given us the exception +promise.completeExceptionally(e); +} +}); +} + +private static CompletableFuture readOneMessageId(LedgerHandle lh, long entryId, Review comment: Ah, I wasn't aware of Caffeine. Changed to use it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
zhaijack commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r167251458 ## File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java ## @@ -359,4 +362,59 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map keyMe reader.close(); log.info("-- Exiting {} test --", methodName); } + + +@Test +public void testSimpleReaderReachEndofTopic() throws Exception { Review comment: thanks, will add the test. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on issue #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
zhaijack commented on issue #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#issuecomment-364459531 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167252486 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java ## @@ -37,6 +36,16 @@ */ byte[] toByteArray(); +/** + * Get the topic name of this MessageId. + * This is mainly for TopicsConsumerImpl to identify a message belongs to which topic. + * + * @return the topic name + */ +default String getTopicName() { Review comment: Yes. It is as @merlimat metioned. for MessageId, we also need `topicName` for `redeliverUnacknowledgedMessages` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167258867 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { Review comment: HandlerBase seems only useful for ConsumerImpl and ProducerImpl, There was an issue opened, which plan to remove it from ConsumerBase. Since ConsumerBase has did some of the work, would like to leverage it and not to do replicated work. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167261106 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java ## @@ -468,17 +468,20 @@ public void redeliverUnacknowledgedMessages() { } @Override -public void redeliverUnacknowledgedMessages(Set messageIds) { +public void redeliverUnacknowledgedMessages(Set messageIds) { Review comment: Thanks, As above reply, we also need this redeliverUnacknowledgedMessages method in Consumer.java handling TopicMessageIdImpl. would like to change this in Consumer.java and make the case in each child class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167263964 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java ## @@ -18,26 +18,24 @@ */ package org.apache.pulsar.client.impl; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; import java.io.Closeable; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; - +import java.util.function.Predicate; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; - public class UnAckedMessageTracker implements Closeable { Review comment: UnackedMessageTracker is not only a member of ConsumerImpl, it should also a member of PartitionedConsumer and TopicsConsumer here. Most of the change is to leverage MessageId::compareTo to make the code clearer. and seems it not casts the type too often here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167263964 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java ## @@ -18,26 +18,24 @@ */ package org.apache.pulsar.client.impl; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; import java.io.Closeable; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; - +import java.util.function.Predicate; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; - public class UnAckedMessageTracker implements Closeable { Review comment: Thanks. UnackedMessageTracker is not only a member of ConsumerImpl, it should also a member of PartitionedConsumer and TopicsConsumer here. Most of the change is to leverage MessageId::compareTo to make the code clearer. and seems it not casts the type too often here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167264860 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map , when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; + +// Map , store partition number for each topic +private final ConcurrentHashMap topics; + +// Queue of partition consumers on which we have stopped calling receiveAsync() because the +// shared incoming queue was full +private final ConcurrentLinkedQueue pausedConsumers; + +// Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to +// resume receiving from the paused consumer partitions +private final int sharedQueueResumeThreshold; + +// sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. +AtomicInteger numberTopicPartitions; + +private final ReadWriteLock lock = new ReentrantReadWriteLock(); +private final ConsumerStats stats; +private final UnAckedMessageTracker unAckedMessageTracker; +private final ConsumerConfiguration internalConfig; + +TopicsConsumerImpl(PulsarClientImpl client, Collection topics, String subscription, + ConsumerConfiguration conf, ExecutorService listenerExecutor, + CompletableFuture subscribeFuture) { +super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription, +conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, +subscribeFuture); + +checkArgument(conf.getReceiverQueueSize() > 0, +"Receiver queue size needs to be greater than 0 for Topics Consumer"); + +this.topics = new ConcurrentHashMap<>(); +this.consumers = new ConcurrentHashMap<>(); +this.pausedConsumers = new ConcurrentLinkedQueue<>(); +this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; +this.numberTopicPartitions = new AtomicInteger(0); + +if (conf.getAckTimeoutMillis() != 0) { +this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis());
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167264860 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map , when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; + +// Map , store partition number for each topic +private final ConcurrentHashMap topics; + +// Queue of partition consumers on which we have stopped calling receiveAsync() because the +// shared incoming queue was full +private final ConcurrentLinkedQueue pausedConsumers; + +// Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to +// resume receiving from the paused consumer partitions +private final int sharedQueueResumeThreshold; + +// sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. +AtomicInteger numberTopicPartitions; + +private final ReadWriteLock lock = new ReentrantReadWriteLock(); +private final ConsumerStats stats; +private final UnAckedMessageTracker unAckedMessageTracker; +private final ConsumerConfiguration internalConfig; + +TopicsConsumerImpl(PulsarClientImpl client, Collection topics, String subscription, + ConsumerConfiguration conf, ExecutorService listenerExecutor, + CompletableFuture subscribeFuture) { +super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription, +conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, +subscribeFuture); + +checkArgument(conf.getReceiverQueueSize() > 0, +"Receiver queue size needs to be greater than 0 for Topics Consumer"); + +this.topics = new ConcurrentHashMap<>(); +this.consumers = new ConcurrentHashMap<>(); +this.pausedConsumers = new ConcurrentLinkedQueue<>(); +this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; +this.numberTopicPartitions = new AtomicInteger(0); + +if (conf.getAckTimeoutMillis() != 0) { +this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis());
[GitHub] merlimat commented on a change in pull request #1209: Fixed missing '"' sign in system metrics for Prometheus
merlimat commented on a change in pull request #1209: Fixed missing '"' sign in system metrics for Prometheus URL: https://github.com/apache/incubator-pulsar/pull/1209#discussion_r167297551 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java ## @@ -85,14 +85,13 @@ private static void generateSystemMetrics(SimpleTextOutputStream stream, String for (int i = 0; i < metricFamily.samples.size(); i++) { Sample sample = metricFamily.samples.get(i); stream.write(sample.name); -stream.write("{cluster=\"").write(cluster).write("\","); +stream.write("{cluster=\"").write(cluster).write('"'); Review comment: Yes, in the previous commit I did add a parser for the Prometheus text format to verify the metrics in the unit tests. Though, the parser is regex based and it's not validating 100% the format. In this PR I added the check to get the value of the tag, so that it was failing before the fix and passing after. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #1074: Document features and usage of Pulsar proxy
merlimat commented on issue #1074: Document features and usage of Pulsar proxy URL: https://github.com/apache/incubator-pulsar/issues/1074#issuecomment-364506591 Fixed in #1086 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat closed issue #1074: Document features and usage of Pulsar proxy
merlimat closed issue #1074: Document features and usage of Pulsar proxy URL: https://github.com/apache/incubator-pulsar/issues/1074 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1200: Add pluggable authorization mechanism
merlimat commented on a change in pull request #1200: Add pluggable authorization mechanism URL: https://github.com/apache/incubator-pulsar/pull/1200#discussion_r167298488 ## File path: conf/proxy.conf ## @@ -49,6 +49,9 @@ authenticationProviders= # Enforce authorization authorizationEnabled=false +# Authorization provider name list, which is comma separated list of class names +authorizationProviders=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider Review comment: Ok, it is always easier to add it if needed, then remove it later This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1205: Algorithm to find start point of compacted ledger
merlimat commented on a change in pull request #1205: Algorithm to find start point of compacted ledger URL: https://github.com/apache/incubator-pulsar/pull/1205#discussion_r167300225 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java ## @@ -18,10 +18,109 @@ */ package org.apache.pulsar.compaction; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.collect.ComparisonChain; + +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.client.api.RawMessage; +import org.apache.pulsar.client.impl.RawMessageImpl; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class CompactedTopicImpl implements CompactedTopic { +final static long NEWER_THAN_COMPACTED = -0xfeed0fbaL; + @Override public void newCompactedLedger(Position p, long compactedLedgerId) {} + +static CompletableFuture findStartPoint(PositionImpl p, + long lastEntryId, + AsyncLoadingCache cache) { +CompletableFuture promise = new CompletableFuture<>(); +findStartPointLoop(p, 0, lastEntryId, promise, cache); +return promise; +} + +private static void findStartPointLoop(PositionImpl p, long start, long end, + CompletableFuture promise, + AsyncLoadingCache cache) { +long midpoint = start + ((end - start) / 2); + +CompletableFuture startEntry = cache.get(start); +CompletableFuture middleEntry = cache.get(midpoint); +CompletableFuture endEntry = cache.get(end); + +CompletableFuture.allOf(startEntry, middleEntry, endEntry).whenComplete( +(v, exception) -> { +if (exception != null) { +promise.completeExceptionally(exception); +} +try { +if (comparePositionAndMessageId(p, startEntry.get()) < 0) { Review comment: Nit: you could use `.join()` instead of `get()`, that throws unchecked exception, since in this case we're not expecting any. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1205: Algorithm to find start point of compacted ledger
merlimat commented on a change in pull request #1205: Algorithm to find start point of compacted ledger URL: https://github.com/apache/incubator-pulsar/pull/1205#discussion_r167300931 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java ## @@ -18,10 +18,109 @@ */ package org.apache.pulsar.compaction; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.collect.ComparisonChain; + +import java.util.NoSuchElementException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.client.api.RawMessage; +import org.apache.pulsar.client.impl.RawMessageImpl; +import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; + + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class CompactedTopicImpl implements CompactedTopic { +final static long NEWER_THAN_COMPACTED = -0xfeed0fbaL; + @Override public void newCompactedLedger(Position p, long compactedLedgerId) {} + +static CompletableFuture findStartPoint(PositionImpl p, + long lastEntryId, + AsyncLoadingCache cache) { +CompletableFuture promise = new CompletableFuture<>(); +findStartPointLoop(p, 0, lastEntryId, promise, cache); +return promise; +} + +private static void findStartPointLoop(PositionImpl p, long start, long end, + CompletableFuture promise, + AsyncLoadingCache cache) { +long midpoint = start + ((end - start) / 2); + +CompletableFuture startEntry = cache.get(start); +CompletableFuture middleEntry = cache.get(midpoint); +CompletableFuture endEntry = cache.get(end); + +CompletableFuture.allOf(startEntry, middleEntry, endEntry).whenComplete( +(v, exception) -> { +if (exception != null) { +promise.completeExceptionally(exception); +} +try { +if (comparePositionAndMessageId(p, startEntry.get()) < 0) { Review comment: Also, if doing like : ```java CompletableFuture.allOf(startEntry, middleEntry, endEntry).thenRun(() -> { // Success case }).exceptionally(ex -> { promise.completeExceptionally(exception); return null; }); ``` The `exceptionally` block will also be executed if there were any exception in the `thenRun()` code, in addition to the async operation failing. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1208: Add hostname-verification at client tls connection
merlimat commented on a change in pull request #1208: Add hostname-verification at client tls connection URL: https://github.com/apache/incubator-pulsar/pull/1208#discussion_r167302047 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/ClientConfiguration.java ## @@ -356,4 +357,21 @@ public void setMaxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRe this.maxNumberOfRejectedRequestPerConnection = maxNumberOfRejectedRequestPerConnection; } +public boolean isTlsHostnameVerificationEnable() { +return tlsHostnameVerificationEnable; +} + +/** + * It allows to validate hostname verification when client connects to broker over tls. It validates incoming x509 + * certificate and matches provided hostname(CN/SAN) with expected broker's host name. It follows RFC 2818, 3.1. Server + * Identity hostname verification. + * + * @see https://tools.ietf.org/html/rfc2818";>rfc2818 + * + * @param tlsHostnameVerificationEnable + */ +public void setTlsHostnameVerificationEnable(boolean tlsHostnameVerificationEnable) { Review comment: Sure, we should just make sure the same check are done in http vs protobuf for the same config This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1208: Add hostname-verification at client tls connection
merlimat commented on a change in pull request #1208: Add hostname-verification at client tls connection URL: https://github.com/apache/incubator-pulsar/pull/1208#discussion_r167302744 ## File path: pom.xml ## @@ -138,6 +138,18 @@ flexible messaging model and an intuitive client API. + Review comment: This dependency should be added at the end of `all/src/assemble/LICENSE.bin.txt` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1208: Add hostname-verification at client tls connection
merlimat commented on a change in pull request #1208: Add hostname-verification at client tls connection URL: https://github.com/apache/incubator-pulsar/pull/1208#discussion_r167302503 ## File path: pulsar-client/pom.xml ## @@ -74,6 +74,17 @@ com.google.code.gson gson + + + org.apache.httpcomponents Review comment: This should be added to the client shading as well. Also in all the other shading config they need to reuse the same list of the client shading conf. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jai1 commented on issue #1208: Add hostname-verification at client tls connection
jai1 commented on issue #1208: Add hostname-verification at client tls connection URL: https://github.com/apache/incubator-pulsar/pull/1208#issuecomment-364523700 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1208: Add hostname-verification at client tls connection
rdhabalia commented on a change in pull request #1208: Add hostname-verification at client tls connection URL: https://github.com/apache/incubator-pulsar/pull/1208#discussion_r167318158 ## File path: pom.xml ## @@ -138,6 +138,18 @@ flexible messaging model and an intuitive client API. + Review comment: I have added `org.apache.httpcomponents:httpclient` in client-shadeed pom and License file. However, I excluded all other jars from httpclient to avoid bringing lot of other things from it, but it requires `commons-logging:commons-logging` for internal logging without it we see ClassNotFoundException for logger class. So, I have added that dep explicitly and added to LICENSE and cliend-sahde as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jai1 commented on issue #1208: Add hostname-verification at client tls connection
jai1 commented on issue #1208: Add hostname-verification at client tls connection URL: https://github.com/apache/incubator-pulsar/pull/1208#issuecomment-364523700 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat opened a new pull request #1210: Cancel keep-alive timer task after the proxy switch to TCP proxy
merlimat opened a new pull request #1210: Cancel keep-alive timer task after the proxy switch to TCP proxy URL: https://github.com/apache/incubator-pulsar/pull/1210 ### Motivation After initial handshake, the Pulsar proxy switches to TCP proxy mode, by just copying buffers between the 2 connections and avoiding all parsing. When that happens, for keep-alive messages, we rely on what the client and broker are exchanging, so they will be able to detect a stale connection client-proxy or proxy-broker. Currently, we're not removing the keep-alive timer in the proxy, so it's forcefully closing the connection after 60s, even though client and broker are perfectly fine. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat opened a new issue #1211: Intermittent test failure int ProxyForwardAuthDataTest.testForwardAuthData
merlimat opened a new issue #1211: Intermittent test failure int ProxyForwardAuthDataTest.testForwardAuthData URL: https://github.com/apache/incubator-pulsar/issues/1211 https://builds.apache.org/job/pulsar-pull-request/org.apache.pulsar$pulsar-proxy/1581/testReport/junit/org.apache.pulsar.proxy.server/ProxyForwardAuthDataTest/testForwardAuthData/ ``` Error Message java.nio.channels.ClosedChannelException Stacktrace org.apache.pulsar.client.api.PulsarClientException: java.nio.channels.ClosedChannelException at org.apache.pulsar.client.impl.PulsarClientImpl.subscribe(PulsarClientImpl.java:215) at org.apache.pulsar.client.impl.PulsarClientImpl.subscribe(PulsarClientImpl.java:202) at org.apache.pulsar.proxy.server.ProxyForwardAuthDataTest.testForwardAuthData(ProxyForwardAuthDataTest.java:140) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #1211: Intermittent test failure int ProxyForwardAuthDataTest.testForwardAuthData
merlimat commented on issue #1211: Intermittent test failure int ProxyForwardAuthDataTest.testForwardAuthData URL: https://github.com/apache/incubator-pulsar/issues/1211#issuecomment-364546359 @jai1 Can you take a look at this? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1208: Add hostname-verification at client tls connection
merlimat commented on a change in pull request #1208: Add hostname-verification at client tls connection URL: https://github.com/apache/incubator-pulsar/pull/1208#discussion_r167332300 ## File path: pulsar-client-shaded/pom.xml ## @@ -81,6 +81,8 @@ org.apache.pulsar:pulsar-checksum net.jpountz.lz4:lz4 com.yahoo.datasketches:sketches-core + org.apache.httpcomponents:httpclient + commons-logging:commons-logging Review comment: Can you also add these in `pulsar-broker-shaded/pom.xml` and `pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml`. I know it's painful, but couldn't find a reliable way to do the shading in the different modules. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1208: Add hostname-verification at client tls connection
rdhabalia commented on a change in pull request #1208: Add hostname-verification at client tls connection URL: https://github.com/apache/incubator-pulsar/pull/1208#discussion_r167332606 ## File path: pulsar-client-shaded/pom.xml ## @@ -81,6 +81,8 @@ org.apache.pulsar:pulsar-checksum net.jpountz.lz4:lz4 com.yahoo.datasketches:sketches-core + org.apache.httpcomponents:httpclient + commons-logging:commons-logging Review comment: I see..let me add it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r167336300 ## File path: pulsar-common/src/main/proto/PulsarApi.proto ## @@ -135,6 +135,7 @@ enum ProtocolVersion { v9 = 9; // Added end of topic notification v10 = 10;// Added proxy to broker v11 = 11;// C++ consumers before this version are not correctly handling the checksum field +//Added get topic's last messageId from broker Review comment: @zhaijack There was a merge issue here, `v11` was already taken in master and this PR should be adding `v12` now. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat closed pull request #1209: Fixed missing '"' sign in system metrics for Prometheus
merlimat closed pull request #1209: Fixed missing '"' sign in system metrics for Prometheus URL: https://github.com/apache/incubator-pulsar/pull/1209 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index 26118c81a..e7bcda1b0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -85,14 +85,13 @@ private static void generateSystemMetrics(SimpleTextOutputStream stream, String for (int i = 0; i < metricFamily.samples.size(); i++) { Sample sample = metricFamily.samples.get(i); stream.write(sample.name); -stream.write("{cluster=\"").write(cluster).write("\","); +stream.write("{cluster=\"").write(cluster).write('"'); for (int j = 0; j < sample.labelNames.size(); j++) { +stream.write(", "); stream.write(sample.labelNames.get(j)); stream.write("=\""); stream.write(sample.labelValues.get(j)); -if (j != sample.labelNames.size() - 1) { -stream.write("\","); -} +stream.write('"'); } stream.write("} "); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index be19b571d..8aa6fdc2c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -90,6 +90,11 @@ public void testPerTopicStats() throws Exception { assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns"); +cm = (List) metrics.get("topic_load_times_count"); +assertEquals(cm.size(), 1); +assertEquals(cm.get(0).value, 2.0); +assertEquals(cm.get(0).tags.get("cluster"), "test"); + p1.close(); p2.close(); } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Fixed missing '"' sign in system metrics for Prometheus (#1209)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 50beaca Fixed missing '"' sign in system metrics for Prometheus (#1209) 50beaca is described below commit 50beacae60cb6ec9384c1fa2b39c4ca3cb7cbe80 Author: Matteo Merli AuthorDate: Fri Feb 9 12:30:02 2018 -0800 Fixed missing '"' sign in system metrics for Prometheus (#1209) --- .../pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java | 7 +++ .../java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java | 5 + 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index 26118c8..e7bcda1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -85,14 +85,13 @@ public class PrometheusMetricsGenerator { for (int i = 0; i < metricFamily.samples.size(); i++) { Sample sample = metricFamily.samples.get(i); stream.write(sample.name); -stream.write("{cluster=\"").write(cluster).write("\","); +stream.write("{cluster=\"").write(cluster).write('"'); for (int j = 0; j < sample.labelNames.size(); j++) { +stream.write(", "); stream.write(sample.labelNames.get(j)); stream.write("=\""); stream.write(sample.labelValues.get(j)); -if (j != sample.labelNames.size() - 1) { -stream.write("\","); -} +stream.write('"'); } stream.write("} "); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index be19b57..8aa6fdc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -90,6 +90,11 @@ public class PrometheusMetricsTest extends BrokerTestBase { assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns"); +cm = (List) metrics.get("topic_load_times_count"); +assertEquals(cm.size(), 1); +assertEquals(cm.get(0).value, 2.0); +assertEquals(cm.get(0).tags.get("cluster"), "test"); + p1.close(); p2.close(); } -- To stop receiving notification emails like this one, please contact mme...@apache.org.
[GitHub] rdhabalia commented on issue #1210: Cancel keep-alive timer task after the proxy switch to TCP proxy
rdhabalia commented on issue #1210: Cancel keep-alive timer task after the proxy switch to TCP proxy URL: https://github.com/apache/incubator-pulsar/pull/1210#issuecomment-364559864 > so it's forcefully closing the connection after 60s but right now, shouldn't proxy also handle client's keep-alive request on `PulsarHandler.handlePing()` which should keep connection alive between client to proxy and same way proxy to broker? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #1210: Cancel keep-alive timer task after the proxy switch to TCP proxy
merlimat commented on issue #1210: Cancel keep-alive timer task after the proxy switch to TCP proxy URL: https://github.com/apache/incubator-pulsar/pull/1210#issuecomment-364562235 > but right now, shouldn't proxy also handle client's keep-alive request on PulsarHandler.handlePing() which should keep connection alive between client to proxy and same way proxy to broker? The proxy just passes along bytes at that point. The ping from client is just passed on to broker, which is the one that will respond. If there is a network partition either between client/proxy or proxy/broker, it will be identfied. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on issue #1210: Cancel keep-alive timer task after the proxy switch to TCP proxy
rdhabalia commented on issue #1210: Cancel keep-alive timer task after the proxy switch to TCP proxy URL: https://github.com/apache/incubator-pulsar/pull/1210#issuecomment-364563293 yes, once proxy state is in `ProxyConnectionToBroker` then it becomes just pass through. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat closed pull request #1210: Cancel keep-alive timer task after the proxy switch to TCP proxy
merlimat closed pull request #1210: Cancel keep-alive timer task after the proxy switch to TCP proxy URL: https://github.com/apache/incubator-pulsar/pull/1210 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarHandler.java index 3d8e7ce15..1094359b8 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarHandler.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarHandler.java @@ -67,9 +67,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { -if (keepAliveTask != null) { -keepAliveTask.cancel(false); -} +cancelKeepAliveTask(); } @Override @@ -113,6 +111,13 @@ private void handleKeepAliveTimeout() { } } +protected void cancelKeepAliveTask() { +if (keepAliveTask != null) { +keepAliveTask.cancel(false); +keepAliveTask = null; +} +} + /** * @return true if the connection is ready to use, meaning the Pulsar handshake was already completed */ diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 921376fc2..68bd02295 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -168,6 +168,7 @@ protected void handleConnect(CommandConnect connect) { // there and just pass bytes in both directions state = State.ProxyConnectionToBroker; directProxyHandler = new DirectProxyHandler(service, this, connect.getProxyToBrokerUrl()); +cancelKeepAliveTask(); } else { // Client is doing a lookup, we can consider the handshake complete and we'll take care of just topics and // partitions metadata lookups This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Cancel keep-alive timer task after the proxy switch to TCP proxy (#1210)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new f288db5 Cancel keep-alive timer task after the proxy switch to TCP proxy (#1210) f288db5 is described below commit f288db58b45821ee7f5df2694eada6236afbe1a4 Author: Matteo Merli AuthorDate: Fri Feb 9 13:02:28 2018 -0800 Cancel keep-alive timer task after the proxy switch to TCP proxy (#1210) --- .../main/java/org/apache/pulsar/common/api/PulsarHandler.java | 11 --- .../java/org/apache/pulsar/proxy/server/ProxyConnection.java | 1 + 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarHandler.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarHandler.java index 3d8e7ce..1094359 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarHandler.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarHandler.java @@ -67,9 +67,7 @@ public abstract class PulsarHandler extends PulsarDecoder { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { -if (keepAliveTask != null) { -keepAliveTask.cancel(false); -} +cancelKeepAliveTask(); } @Override @@ -113,6 +111,13 @@ public abstract class PulsarHandler extends PulsarDecoder { } } +protected void cancelKeepAliveTask() { +if (keepAliveTask != null) { +keepAliveTask.cancel(false); +keepAliveTask = null; +} +} + /** * @return true if the connection is ready to use, meaning the Pulsar handshake was already completed */ diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index 921376f..68bd022 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -168,6 +168,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167372153 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map , when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; + +// Map , store partition number for each topic +private final ConcurrentHashMap topics; + +// Queue of partition consumers on which we have stopped calling receiveAsync() because the +// shared incoming queue was full +private final ConcurrentLinkedQueue pausedConsumers; + +// Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to +// resume receiving from the paused consumer partitions +private final int sharedQueueResumeThreshold; + +// sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. +AtomicInteger numberTopicPartitions; + +private final ReadWriteLock lock = new ReentrantReadWriteLock(); +private final ConsumerStats stats; +private final UnAckedMessageTracker unAckedMessageTracker; +private final ConsumerConfiguration internalConfig; + +TopicsConsumerImpl(PulsarClientImpl client, Collection topics, String subscription, + ConsumerConfiguration conf, ExecutorService listenerExecutor, + CompletableFuture subscribeFuture) { +super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription, +conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, +subscribeFuture); + +checkArgument(conf.getReceiverQueueSize() > 0, +"Receiver queue size needs to be greater than 0 for Topics Consumer"); + +this.topics = new ConcurrentHashMap<>(); +this.consumers = new ConcurrentHashMap<>(); +this.pausedConsumers = new ConcurrentLinkedQueue<>(); +this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; +this.numberTopicPartitions = new AtomicInteger(0); + +if (conf.getAckTimeoutMillis() != 0) { +this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis());
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167372586 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map , when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; + +// Map , store partition number for each topic +private final ConcurrentHashMap topics; + +// Queue of partition consumers on which we have stopped calling receiveAsync() because the +// shared incoming queue was full +private final ConcurrentLinkedQueue pausedConsumers; + +// Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to +// resume receiving from the paused consumer partitions +private final int sharedQueueResumeThreshold; + +// sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. +AtomicInteger numberTopicPartitions; + +private final ReadWriteLock lock = new ReentrantReadWriteLock(); +private final ConsumerStats stats; +private final UnAckedMessageTracker unAckedMessageTracker; +private final ConsumerConfiguration internalConfig; + +TopicsConsumerImpl(PulsarClientImpl client, Collection topics, String subscription, + ConsumerConfiguration conf, ExecutorService listenerExecutor, + CompletableFuture subscribeFuture) { +super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription, +conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, +subscribeFuture); + +checkArgument(conf.getReceiverQueueSize() > 0, +"Receiver queue size needs to be greater than 0 for Topics Consumer"); + +this.topics = new ConcurrentHashMap<>(); +this.consumers = new ConcurrentHashMap<>(); +this.pausedConsumers = new ConcurrentLinkedQueue<>(); +this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; +this.numberTopicPartitions = new AtomicInteger(0); + +if (conf.getAckTimeoutMillis() != 0) { +this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis());
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167372863 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map , when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; + +// Map , store partition number for each topic +private final ConcurrentHashMap topics; + +// Queue of partition consumers on which we have stopped calling receiveAsync() because the +// shared incoming queue was full +private final ConcurrentLinkedQueue pausedConsumers; + +// Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to +// resume receiving from the paused consumer partitions +private final int sharedQueueResumeThreshold; + +// sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. +AtomicInteger numberTopicPartitions; + +private final ReadWriteLock lock = new ReentrantReadWriteLock(); +private final ConsumerStats stats; +private final UnAckedMessageTracker unAckedMessageTracker; +private final ConsumerConfiguration internalConfig; + +TopicsConsumerImpl(PulsarClientImpl client, Collection topics, String subscription, + ConsumerConfiguration conf, ExecutorService listenerExecutor, + CompletableFuture subscribeFuture) { +super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription, +conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, +subscribeFuture); + +checkArgument(conf.getReceiverQueueSize() > 0, +"Receiver queue size needs to be greater than 0 for Topics Consumer"); + +this.topics = new ConcurrentHashMap<>(); +this.consumers = new ConcurrentHashMap<>(); +this.pausedConsumers = new ConcurrentLinkedQueue<>(); +this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; +this.numberTopicPartitions = new AtomicInteger(0); + +if (conf.getAckTimeoutMillis() != 0) { +this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis());
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r167382938 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ## @@ -1248,6 +1254,77 @@ public void seek(MessageId messageId) throws PulsarClientException { return seekFuture; } +public boolean hasMessageAvailable() throws PulsarClientException { +try { +return hasMessageAvailableAsync().get(); +} catch (ExecutionException | InterruptedException e) { +throw new PulsarClientException(e); +} +} + +public CompletableFuture hasMessageAvailableAsync() { +final CompletableFuture booleanFuture = new CompletableFuture<>(); + +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +booleanFuture.complete(true); +} else { +getLastMessageIdAsync().thenAccept(messageId -> { +lastMessageIdInBroker = messageId; +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +booleanFuture.complete(true); +} else { +booleanFuture.complete(false); +} +}).exceptionally(e -> { +log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); +booleanFuture.completeExceptionally(e.getCause()); +return null; +}); +} +return booleanFuture; +} + +private CompletableFuture getLastMessageIdAsync() { +if (getState() == State.Closing || getState() == State.Closed) { +return FutureUtil +.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed")); +} + +if (cnx().getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getNumber()) { +return FutureUtil +.failedFuture(new PulsarClientException +.NotSupportedException("GetLastMessageId Not supported for ProtocolVersion: " + +cnx().getRemoteEndpointProtocolVersion())); +} + +if (!isConnected()) { Review comment: If it's currently not connected, we should try to mask the exception from the user if the failure is transient. There is already an `operationTimeout` defined in client, it would be good to have a way to retry internally with backoff up to that amount of time. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r167382781 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ## @@ -1248,6 +1254,77 @@ public void seek(MessageId messageId) throws PulsarClientException { return seekFuture; } +public boolean hasMessageAvailable() throws PulsarClientException { +try { +return hasMessageAvailableAsync().get(); +} catch (ExecutionException | InterruptedException e) { +throw new PulsarClientException(e); +} +} + +public CompletableFuture hasMessageAvailableAsync() { +final CompletableFuture booleanFuture = new CompletableFuture<>(); + +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && Review comment: We should do this check also in the sync `hasMessageAvailable()` case, so that we can avoid creating a `CompletableFuture` each time we check, if the `lastMessageId` is already cached. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r167382822 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ## @@ -1248,6 +1254,77 @@ public void seek(MessageId messageId) throws PulsarClientException { return seekFuture; } +public boolean hasMessageAvailable() throws PulsarClientException { +try { +return hasMessageAvailableAsync().get(); +} catch (ExecutionException | InterruptedException e) { +throw new PulsarClientException(e); +} +} + +public CompletableFuture hasMessageAvailableAsync() { +final CompletableFuture booleanFuture = new CompletableFuture<>(); + +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +booleanFuture.complete(true); +} else { +getLastMessageIdAsync().thenAccept(messageId -> { +lastMessageIdInBroker = messageId; +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +booleanFuture.complete(true); +} else { +booleanFuture.complete(false); +} +}).exceptionally(e -> { +log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); +booleanFuture.completeExceptionally(e.getCause()); +return null; +}); +} +return booleanFuture; +} + +private CompletableFuture getLastMessageIdAsync() { +if (getState() == State.Closing || getState() == State.Closed) { +return FutureUtil +.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed")); +} + +if (cnx().getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getNumber()) { Review comment: If it's not connected `cnx()` will return null here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #1207: Allow to configure most client/producer/consumer options in Kafka API wrapper
merlimat commented on issue #1207: Allow to configure most client/producer/consumer options in Kafka API wrapper URL: https://github.com/apache/incubator-pulsar/pull/1207#issuecomment-364614000 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #1207: Allow to configure most client/producer/consumer options in Kafka API wrapper
merlimat commented on issue #1207: Allow to configure most client/producer/consumer options in Kafka API wrapper URL: https://github.com/apache/incubator-pulsar/pull/1207#issuecomment-364614000 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia closed pull request #1208: Add hostname-verification at client tls connection
rdhabalia closed pull request #1208: Add hostname-verification at client tls connection URL: https://github.com/apache/incubator-pulsar/pull/1208 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/all/src/assemble/LICENSE.bin.txt b/all/src/assemble/LICENSE.bin.txt index 41f9000d6..a7e70defb 100644 --- a/all/src/assemble/LICENSE.bin.txt +++ b/all/src/assemble/LICENSE.bin.txt @@ -332,6 +332,8 @@ The Apache Software License, Version 2.0 * Jetty - org.eclipse.jetty-*.jar * SnakeYaml -- org.yaml-snakeyaml-*.jar * RocksDB - org.rocksdb.*.jar + * HttpClient - org.apache.httpcomponents.httpclient.jar + * CommonsLogging - commons-logging-*.jar BSD 3-clause "New" or "Revised" License * EA Agent Loader -- com.ea.agentloader-*.jar -- licenses/LICENSE-EA-Agent-Loader.txt diff --git a/pom.xml b/pom.xml index 27ff691fb..7320661d5 100644 --- a/pom.xml +++ b/pom.xml @@ -138,6 +138,18 @@ flexible messaging model and an intuitive client API. + +org.apache.httpcomponents +httpclient +4.5.5 + + +* +* + + + + org.testng testng @@ -760,6 +772,7 @@ flexible messaging model and an intuitive client API. **/*.crt **/*.key **/*.csr +**/*.pem **/*.json **/*.htpasswd src/test/resources/athenz.conf.test diff --git a/pulsar-broker-shaded/pom.xml b/pulsar-broker-shaded/pom.xml index fd3ff68de..bda3037eb 100644 --- a/pulsar-broker-shaded/pom.xml +++ b/pulsar-broker-shaded/pom.xml @@ -104,6 +104,8 @@ org.aspectj:* com.ea.agentloader:* com.wordnik:swagger-annotations + org.apache.httpcomponents:httpclient + commons-logging:commons-logging @@ -298,6 +300,10 @@ com.wordnik org.apache.pulsar.shade.com.worknik + + org.apache.http + org.apache.pulsar.shade.org.apache.http + diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java index cd0415aab..31387696a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java @@ -19,8 +19,11 @@ package org.apache.pulsar.broker.service; import java.io.File; +import java.security.cert.X509Certificate; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.impl.auth.AuthenticationDataTls; import org.apache.pulsar.common.api.ByteBufPair; import org.apache.pulsar.common.api.PulsarDecoder; @@ -68,6 +71,17 @@ protected void initChannel(SocketChannel ch) throws Exception { builder.trustManager(trustCertCollection); } } + +ServiceConfiguration config = brokerService.pulsar().getConfiguration(); +String certFilePath = config.getTlsCertificateFilePath(); +String keyFilePath = config.getTlsKeyFilePath(); +if (StringUtils.isNotBlank(certFilePath) && StringUtils.isNotBlank(keyFilePath)) { +AuthenticationDataTls authTlsData = new AuthenticationDataTls(certFilePath, keyFilePath); +builder.keyManager(authTlsData.getTlsPrivateKey(), +(X509Certificate[]) authTlsData.getTlsCertificates()); +} + + SslContext sslCtx = builder.clientAuth(ClientAuth.OPTIONAL).build(); ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc())); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java new file mode 100644 index 0..5ccfc142b --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticationTlsHostnameVerificationTest.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0
[incubator-pulsar] branch master updated: Add hostname-verification at client tls connection (#1208)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 8d3ab43 Add hostname-verification at client tls connection (#1208) 8d3ab43 is described below commit 8d3ab43cee86c9e49a54db13929a4ecb09e8152f Author: Rajan Dhabalia AuthorDate: Fri Feb 9 17:43:23 2018 -0800 Add hostname-verification at client tls connection (#1208) * Add hostname-verification at client tls connection * add httpclient dep with exclude all + add pem in apache-rat * add httpclient+commons-logging dep in client-shading and LICENSE * shade artifacts * fix: proxy send certs to client for host verification --- all/src/assemble/LICENSE.bin.txt | 2 + pom.xml| 13 ++ pulsar-broker-shaded/pom.xml | 6 + .../broker/service/PulsarChannelInitializer.java | 14 ++ .../AuthenticationTlsHostnameVerificationTest.java | 255 + .../tls/hn-verification/broker-cert.pem| 82 +++ .../tls/hn-verification/broker-key.pem | 28 +++ .../authentication/tls/hn-verification/cacert.pem | 79 +++ .../pulsar-client-kafka/pom.xml| 6 + pulsar-client-shaded/pom.xml | 6 + pulsar-client/pom.xml | 18 ++ .../pulsar/client/api/ClientConfiguration.java | 18 ++ .../org/apache/pulsar/client/impl/ClientCnx.java | 54 - .../apache/pulsar/client/impl/ConnectionPool.java | 2 + .../proxy/server/ServiceChannelInitializer.java| 14 ++ .../server/ProxyWithProxyAuthorizationTest.java| 51 - 16 files changed, 642 insertions(+), 6 deletions(-) diff --git a/all/src/assemble/LICENSE.bin.txt b/all/src/assemble/LICENSE.bin.txt index 41f9000..a7e70de 100644 --- a/all/src/assemble/LICENSE.bin.txt +++ b/all/src/assemble/LICENSE.bin.txt @@ -332,6 +332,8 @@ The Apache Software License, Version 2.0 * Jetty - org.eclipse.jetty-*.jar * SnakeYaml -- org.yaml-snakeyaml-*.jar * RocksDB - org.rocksdb.*.jar + * HttpClient - org.apache.httpcomponents.httpclient.jar + * CommonsLogging - commons-logging-*.jar BSD 3-clause "New" or "Revised" License * EA Agent Loader -- com.ea.agentloader-*.jar -- licenses/LICENSE-EA-Agent-Loader.txt diff --git a/pom.xml b/pom.xml index 27ff691..7320661 100644 --- a/pom.xml +++ b/pom.xml @@ -139,6 +139,18 @@ flexible messaging model and an intuitive client API. +org.apache.httpcomponents +httpclient +4.5.5 + + +* +* + + + + + org.testng testng 6.13.1 @@ -760,6 +772,7 @@ flexible messaging model and an intuitive client API. **/*.crt **/*.key **/*.csr +**/*.pem **/*.json **/*.htpasswd src/test/resources/athenz.conf.test diff --git a/pulsar-broker-shaded/pom.xml b/pulsar-broker-shaded/pom.xml index fd3ff68..bda3037 100644 --- a/pulsar-broker-shaded/pom.xml +++ b/pulsar-broker-shaded/pom.xml @@ -104,6 +104,8 @@ org.aspectj:* com.ea.agentloader:* com.wordnik:swagger-annotations + org.apache.httpcomponents:httpclient + commons-logging:commons-logging @@ -298,6 +300,10 @@ com.wordnik org.apache.pulsar.shade.com.worknik + + org.apache.http + org.apache.pulsar.shade.org.apache.http + diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java index cd0415a..3138769 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java @@ -19,8 +19,11 @@ package org.apache.pulsar.broker.service; import java.io.File; +import java.security.cert.X509Certificate; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.impl.auth.AuthenticationDataTls; import org.apache.pulsar.common.api.ByteBufPair; import org.apache.pulsar.common.api.PulsarDecoder; @@ -68,6 +71,17 @@ public class PulsarChannelInitializer extends ChannelInitializer builder.trustManager(trustCertCollection); } } + +ServiceConfiguration conf
[GitHub] maskit commented on issue #1208: Add hostname-verification at client tls connection
maskit commented on issue #1208: Add hostname-verification at client tls connection URL: https://github.com/apache/incubator-pulsar/pull/1208#issuecomment-364618475 Actually I was looking into the code, and found much of duplicate code. The blocks for `trustManager` and `keyManager` really seem like `SecurityUtility::createNettySslContext`. Probably only difference is context for client vs for server. Using `SecurityUtility` class would also remove the dependency for `org.apache.pulsar.client.impl.auth.AuthenticationDataTls`. That is what `AuthenticationDataTls` use internally. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1156: Introduce ConsumerGroupListener for realizing if a consumer is active in a failover subscription group
sijie commented on a change in pull request #1156: Introduce ConsumerGroupListener for realizing if a consumer is active in a failover subscription group URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r167388187 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java ## @@ -127,6 +129,33 @@ public ConsumerConfiguration setMessageListener(MessageListener messageListener) return this; } +/** + * @return this configured {@link ConsumerGroupListener} for the consumer. + * @see #setConsumerGroupListener(ConsumerGroupListener) + * @since 1.22.0 + */ +public ConsumerGroupListener getConsumerGroupListener() { Review comment: I renamed "ConsumerGroupListener" to "ActiveConsumerListener". I added the logic on pulsar client to fail `subscribe` if a listener is set for non-failover subscription. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] maskit opened a new pull request #1212: Support custom URL scheme handlers
maskit opened a new pull request #1212: Support custom URL scheme handlers URL: https://github.com/apache/incubator-pulsar/pull/1212 ### Motivation Data URL scheme is implemented in Athenz auth plugin but it's not Athenz specific and should be able to use from other places. ### Modifications - Generalize custom URL scheme support ### Result - Be able to use Data URL scheme from anywhere - Be able to add other custom schemes easily ### Note I set milestone to 2.0.0 but it doesn't break any compatibility. It's just for release schedule. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1156: Introduce ConsumerGroupListener for realizing if a consumer is active in a failover subscription group
sijie commented on a change in pull request #1156: Introduce ConsumerGroupListener for realizing if a consumer is active in a failover subscription group URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r167388965 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java ## @@ -81,20 +82,33 @@ public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int part protected abstract void cancelPendingRead(); -protected void pickAndScheduleActiveConsumer() { +protected void notifyConsumerGroupChanged(Consumer activeConsumer) { +consumers.forEach(consumer -> +consumer.notifyConsumerGroupChange(activeConsumer.consumerId())); +} + +/** + * @return the previous active consumer if the consumer is changed, otherwise null. + */ +protected boolean pickAndScheduleActiveConsumer() { checkArgument(!consumers.isEmpty()); consumers.sort((c1, c2) -> c1.consumerName().compareTo(c2.consumerName())); int index = partitionIndex % consumers.size(); Consumer prevConsumer = ACTIVE_CONSUMER_UPDATER.getAndSet(this, consumers.get(index)); -if (prevConsumer == ACTIVE_CONSUMER_UPDATER.get(this)) { +Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this); +if (prevConsumer == activeConsumer) { // Active consumer did not change. Do nothing at this point -return; +return false; +} else { +// If the active consumer is changed, send notification. +notifyConsumerGroupChanged(activeConsumer); Review comment: @merlimat I have changed to notify active consumer changes only after the cursor is rewinded. please review it again. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #1156: Introduce ConsumerGroupListener for realizing if a consumer is active in a failover subscription group
sijie commented on issue #1156: Introduce ConsumerGroupListener for realizing if a consumer is active in a failover subscription group URL: https://github.com/apache/incubator-pulsar/pull/1156#issuecomment-364625065 @merlimat I have addressed your comments. can you review this again? After it looks good, I will rebase it to latest master. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat closed pull request #1207: Allow to configure most client/producer/consumer options in Kafka API wrapper
merlimat closed pull request #1207: Allow to configure most client/producer/consumer options in Kafka API wrapper URL: https://github.com/apache/incubator-pulsar/pull/1207 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java index d3dc6e431..97cde46ba 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java @@ -56,7 +56,8 @@ import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.kafka.compat.MessageIdUtils; -import org.apache.pulsar.client.kafka.compat.PulsarKafkaConfig; +import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig; +import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig; import org.apache.pulsar.client.util.ConsumerName; import org.apache.pulsar.client.util.FutureUtil; import org.apache.pulsar.common.naming.DestinationName; @@ -80,6 +81,8 @@ private volatile boolean closed = false; +private final Properties properties; + private static class QueueItem { final org.apache.pulsar.client.api.Consumer consumer; final Message message; @@ -141,9 +144,9 @@ private PulsarKafkaConsumer(ConsumerConfig config, Deserializer keyDeserializ String serviceUrl = config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0); -Properties properties = new Properties(); +this.properties = new Properties(); config.originals().forEach((k, v) -> properties.put(k, v)); -ClientConfiguration clientConf = PulsarKafkaConfig.getClientConfiguration(properties); +ClientConfiguration clientConf = PulsarClientKafkaConfig.getClientConfiguration(properties); // Since this client instance is going to be used just for the consumers, we can enable Nagle to group // all the acknowledgments sent to broker within a short time frame clientConf.setUseTcpNoDelay(false); @@ -201,7 +204,7 @@ public void subscribe(Collection topics, ConsumerRebalanceListener callb // acknowledgeCumulative() int numberOfPartitions = ((PulsarClientImpl) client).getNumberOfPartitions(topic).get(); -ConsumerConfiguration conf = new ConsumerConfiguration(); +ConsumerConfiguration conf = PulsarConsumerKafkaConfig.getConsumerConfiguration(properties); conf.setSubscriptionType(SubscriptionType.Failover); conf.setMessageListener(this); if (numberOfPartitions > 1) { diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java index 793a6418a..7b8bf9ab3 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java @@ -48,7 +48,8 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.kafka.compat.MessageIdUtils; -import org.apache.pulsar.client.kafka.compat.PulsarKafkaConfig; +import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig; +import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig; public class PulsarKafkaProducer implements Producer { @@ -106,15 +107,14 @@ private PulsarKafkaProducer(Map conf, Properties properties, Ser } String serviceUrl = producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0); -ClientConfiguration clientConf = PulsarKafkaConfig.getClientConfiguration(properties); +ClientConfiguration clientConf = PulsarClientKafkaConfig.getClientConfiguration(properties); try { client = PulsarClient.create(serviceUrl, clientConf); } catch (PulsarClientException e) { throw new RuntimeException(e); } -pulsarProducerConf = new ProducerConfiguration(); -pulsarProducerConf.setBatchingEnabled(true); +pulsarProducerConf = PulsarProducerKafkaConfig.getProducerConfiguration(properties);
[incubator-pulsar] branch master updated: Allow to configure most client/producer/consumer options in Kafka API wrapper (#1207)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 16a554b Allow to configure most client/producer/consumer options in Kafka API wrapper (#1207) 16a554b is described below commit 16a554bfd5903482512d9a56ccb21dfd5c01 Author: Matteo Merli AuthorDate: Fri Feb 9 21:32:49 2018 -0800 Allow to configure most client/producer/consumer options in Kafka API wrapper (#1207) --- .../clients/consumer/PulsarKafkaConsumer.java | 11 ++-- .../clients/producer/PulsarKafkaProducer.java | 8 +-- ...fkaConfig.java => PulsarClientKafkaConfig.java} | 44 ++- .../kafka/compat/PulsarConsumerKafkaConfig.java| 50 + .../kafka/compat/PulsarProducerKafkaConfig.java| 64 ++ site/docs/latest/adaptors/KafkaWrapper.md | 42 +++--- 6 files changed, 203 insertions(+), 16 deletions(-) diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java index d3dc6e4..97cde46 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java @@ -56,7 +56,8 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.kafka.compat.MessageIdUtils; -import org.apache.pulsar.client.kafka.compat.PulsarKafkaConfig; +import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig; +import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig; import org.apache.pulsar.client.util.ConsumerName; import org.apache.pulsar.client.util.FutureUtil; import org.apache.pulsar.common.naming.DestinationName; @@ -80,6 +81,8 @@ public class PulsarKafkaConsumer implements Consumer, MessageListene private volatile boolean closed = false; +private final Properties properties; + private static class QueueItem { final org.apache.pulsar.client.api.Consumer consumer; final Message message; @@ -141,9 +144,9 @@ public class PulsarKafkaConsumer implements Consumer, MessageListene String serviceUrl = config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0); -Properties properties = new Properties(); +this.properties = new Properties(); config.originals().forEach((k, v) -> properties.put(k, v)); -ClientConfiguration clientConf = PulsarKafkaConfig.getClientConfiguration(properties); +ClientConfiguration clientConf = PulsarClientKafkaConfig.getClientConfiguration(properties); // Since this client instance is going to be used just for the consumers, we can enable Nagle to group // all the acknowledgments sent to broker within a short time frame clientConf.setUseTcpNoDelay(false); @@ -201,7 +204,7 @@ public class PulsarKafkaConsumer implements Consumer, MessageListene // acknowledgeCumulative() int numberOfPartitions = ((PulsarClientImpl) client).getNumberOfPartitions(topic).get(); -ConsumerConfiguration conf = new ConsumerConfiguration(); +ConsumerConfiguration conf = PulsarConsumerKafkaConfig.getConsumerConfiguration(properties); conf.setSubscriptionType(SubscriptionType.Failover); conf.setMessageListener(this); if (numberOfPartitions > 1) { diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java index 793a641..7b8bf9a 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java @@ -48,7 +48,8 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.kafka.compat.MessageIdUtils; -import org.apache.pulsar.client.kafka.compat.PulsarKafkaConfig; +import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig; +import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig; public class PulsarKafkaProducer implements Producer { @@ -106,15 +107,14 @@ publ
[GitHub] maskit opened a new pull request #1213: Use SecurityUtility class
maskit opened a new pull request #1213: Use SecurityUtility class URL: https://github.com/apache/incubator-pulsar/pull/1213 ### Motivation - Duplicate code makes maintenance hard ### Modifications - Rename `createNettySslContext` to `createNettySslContextForClient` - Add `createNettySslContextForServer` - Use `createNettySslContextForServer` from `PulsarChannelInitializer` and `ServiceChannelInitializer` ### Result - Less duplicate code This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] maskit commented on a change in pull request #1213: Use SecurityUtility class
maskit commented on a change in pull request #1213: Use SecurityUtility class URL: https://github.com/apache/incubator-pulsar/pull/1213#discussion_r167390938 ## File path: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java ## @@ -55,24 +47,12 @@ public ServiceChannelInitializer(ProxyService proxyService, ProxyConfiguration s @Override protected void initChannel(SocketChannel ch) throws Exception { if (enableTLS) { -File tlsCert = new File(serviceConfig.getTlsCertificateFilePath()); -File tlsKey = new File(serviceConfig.getTlsKeyFilePath()); -SslContextBuilder builder = SslContextBuilder.forServer(tlsCert, tlsKey); -// allows insecure connection -builder.trustManager(InsecureTrustManagerFactory.INSTANCE); -SslContext sslCtx = builder.clientAuth(ClientAuth.OPTIONAL).build(); +SslContext sslCtx = SecurityUtility.createNettySslContextForClient(true, Review comment: Note that the first argument for`allowInsecureConnection` is `true` because the original code always allows insecure connection regardless of configuration. If it wasn't intentional, it need to be fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] maskit commented on issue #1213: Use SecurityUtility class
maskit commented on issue #1213: Use SecurityUtility class URL: https://github.com/apache/incubator-pulsar/pull/1213#issuecomment-364630011 I know `SecurityUtility` class itself should be refactored too but I'll do that later to make this PR simple and easy to review. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] maskit commented on issue #1208: Add hostname-verification at client tls connection
maskit commented on issue #1208: Add hostname-verification at client tls connection URL: https://github.com/apache/incubator-pulsar/pull/1208#issuecomment-364630116 Submitted a PR #1213 to address the code duplication. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on a change in pull request #1213: Use SecurityUtility class
rdhabalia commented on a change in pull request #1213: Use SecurityUtility class URL: https://github.com/apache/incubator-pulsar/pull/1213#discussion_r167391135 ## File path: pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ServiceChannelInitializer.java ## @@ -55,24 +47,12 @@ public ServiceChannelInitializer(ProxyService proxyService, ProxyConfiguration s @Override protected void initChannel(SocketChannel ch) throws Exception { if (enableTLS) { -File tlsCert = new File(serviceConfig.getTlsCertificateFilePath()); -File tlsKey = new File(serviceConfig.getTlsKeyFilePath()); -SslContextBuilder builder = SslContextBuilder.forServer(tlsCert, tlsKey); -// allows insecure connection -builder.trustManager(InsecureTrustManagerFactory.INSTANCE); -SslContext sslCtx = builder.clientAuth(ClientAuth.OPTIONAL).build(); +SslContext sslCtx = SecurityUtility.createNettySslContextForClient(true, Review comment: can we also add comment `SecurityUtility.createNettySslContextForClient(true\* to allow InsecureConnection*\,..)` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia opened a new pull request #1214: Support hostname verification on proxy to broker connection
rdhabalia opened a new pull request #1214: Support hostname verification on proxy to broker connection URL: https://github.com/apache/incubator-pulsar/pull/1214 ### Motivation In #1208, we have added support for hostname verification at client when client creates tls connection with broker and proxy. However, if proxy is also not in local n/w then it would also require to support hostname verification when it connects with broker. ### Modifications add option at proxy which forces proxy to do hostname verification when it connects to broker. ### Result proxy can support hostname verification when it connects to broker. After your change, what will change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services