[GitHub] merlimat commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
merlimat commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r168087411 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map , when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; Review comment: I think 99% of the logic should be unchanged with `PartitionedConsumerImpl`, the only difference would in constructor (a list of topics vs the number of partitions.. which can be easily be converted in a list of topics anyway), and using the partition index in the message id (which, again, we could use the "partition" topic anyway even in that case). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #1211: Intermittent test failure int ProxyForwardAuthDataTest.testForwardAuthData
merlimat commented on issue #1211: Intermittent test failure int ProxyForwardAuthDataTest.testForwardAuthData URL: https://github.com/apache/incubator-pulsar/issues/1211#issuecomment-365510063 Fixed in #1230 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat closed issue #1211: Intermittent test failure int ProxyForwardAuthDataTest.testForwardAuthData
merlimat closed issue #1211: Intermittent test failure int ProxyForwardAuthDataTest.testForwardAuthData URL: https://github.com/apache/incubator-pulsar/issues/1211 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #1228: Force to pull docker build image to regenerate PulsarApi.java generat?
merlimat commented on issue #1228: Force to pull docker build image to regenerate PulsarApi.java generat? URL: https://github.com/apache/incubator-pulsar/pull/1228#issuecomment-365509563 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #1228: Force to pull docker build image to regenerate PulsarApi.java generat?
merlimat commented on issue #1228: Force to pull docker build image to regenerate PulsarApi.java generat? URL: https://github.com/apache/incubator-pulsar/pull/1228#issuecomment-365509563 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group
merlimat commented on issue #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group URL: https://github.com/apache/incubator-pulsar/pull/1156#issuecomment-365509341 @sijie Change looks good, though there's still the issue at https://github.com/apache/incubator-pulsar/pull/1156/files#r167413443 . I think is using the `consumerId` of the active consumer to decide if other consumers are active/inactive, but we cannot compare `consumerId` across consumers. The other comment was around making the Listener interface name more neutral so that we can add more event handlers, if (and when) the need arises, without having to add an additional listener. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group
merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r168085822 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/ActiveConsumerListener.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +/** + * Listener on the consumer state changes. + */ +public interface ActiveConsumerListener { Review comment: Sure, that makes sense. One other thing that I was thinking is that we could have this listener to be more general oriented, so that we could in future reuse it for other kinds of notifications. Something like `ConsumerEventsListener` so that is more neutral? What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group
merlimat commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r168085850 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java ## @@ -127,6 +129,33 @@ public ConsumerConfiguration setMessageListener(MessageListener messageListener) return this; } +/** + * @return this configured {@link ActiveConsumerListener} for the consumer. + * @see #setActiveConsumerListener(ActiveConsumerListener) + * @since 1.22.0 Review comment: since 2.0 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #1089: PIP-12 Introduce builder for creating Producer Consumer Reader
merlimat commented on issue #1089: PIP-12 Introduce builder for creating Producer Consumer Reader URL: https://github.com/apache/incubator-pulsar/pull/1089#issuecomment-365505550 > Once we have subscription for topics and pattern, we need add them in. Yes, the idea was to have different ways to specify: ```java // Single topic client.newConsumer.topic(MY_TOPIC).subscriptioName(SUB).build(); // List of topics List myList = ...; client.newConsumer.topics(myList).subscriptioName(SUB).build(); // Regex client.newConsumer.topicsRegex("test.*").subscriptioName(SUB).build(); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #1089: PIP-12 Introduce builder for creating Producer Consumer Reader
merlimat commented on issue #1089: PIP-12 Introduce builder for creating Producer Consumer Reader URL: https://github.com/apache/incubator-pulsar/pull/1089#issuecomment-365505133 This is ready for review. Once this is finalized and merged, I'll start converting all the code to use new API. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat closed pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat closed pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index e13664c27..9149bb9f9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -334,4 +334,11 @@ * @param config */ void setConfig(ManagedLedgerConfig config); + +/** + * Gets last confirmed entry of the managed ledger. + * + * @return the last confirmed entry id + */ +Position getLastConfirmedEntry(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 80a0bbed8..89d3476ea 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1062,7 +1062,7 @@ private void closeAllCursors(CloseCallback callback, final Object ctx) { Futures.waitForAll(futures).thenRun(() -> { callback.closeComplete(ctx); }).exceptionally(exception -> { - callback.closeFailed(getManagedLedgerException(exception.getCause()), ctx); + callback.closeFailed(ManagedLedgerException.getManagedLedgerException(exception.getCause()), ctx); return null; }); } @@ -1282,7 +1282,7 @@ void asyncReadEntries(OpReadEntry opReadEntry) { }).exceptionally(ex -> { log.error("[{}] Error opening ledger for reading at position {} - {}", name, opReadEntry.readPosition, ex.getMessage()); - opReadEntry.readEntriesFailed(getManagedLedgerException(ex.getCause()), opReadEntry.ctx); + opReadEntry.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), opReadEntry.ctx); return null; }); } @@ -1351,7 +1351,7 @@ void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Object ct entryCache.asyncReadEntry(ledger, position, callback, ctx); }).exceptionally(ex -> { log.error("[{}] Error opening ledger for reading at position {} - {}", name, position, ex.getMessage()); - callback.readEntryFailed(getManagedLedgerException(ex.getCause()), ctx); + callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), ctx); return null; }); } @@ -2173,7 +2173,8 @@ public int getPendingAddEntriesCount() { return pendingAddEntries.size(); } -public PositionImpl getLastConfirmedEntry() { +@Override +public Position getLastConfirmedEntry() { return lastConfirmedEntry; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 0b8d51280..cb473c875 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -31,10 +31,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; - import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; - import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.util.SafeRun; @@ -59,6 +57,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStats; import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse; import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId; import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic; import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducer; @@ -110,7 +109,7 @@ private String originalPrincipal = null; private Set proxyRoles; private boolean authenticateOriginalAuthData; - + enum State { Start, Connected, Failed } @@ -192,8 +191,8 @@ public void exce
[incubator-pulsar] branch master updated: Issue 937: add CommandGetLastMessageId to make reader know the end of topic (#1066)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 7404952 Issue 937: add CommandGetLastMessageId to make reader know the end of topic (#1066) 7404952 is described below commit 74049522a1d97e1171c9088acd638b426b6de015 Author: Jia Zhai AuthorDate: Tue Feb 13 21:21:29 2018 -0800 Issue 937: add CommandGetLastMessageId to make reader know the end of topic (#1066) * add CommandGetLastMessageId to getlastMessageId of topic * rebase master, change following comments * add partition index in GetLastMessageIdResponse * fix rebase error * bump proot version to v11 * change following comments * change following comments2 * change following comments3 * change following comments * get cnx() first --- .../apache/bookkeeper/mledger/ManagedLedger.java |7 + .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |9 +- .../apache/pulsar/broker/service/ServerCnx.java| 52 +- .../org/apache/pulsar/broker/service/Topic.java|3 + .../service/nonpersistent/NonPersistentTopic.java |6 + .../broker/service/persistent/PersistentTopic.java |5 + .../apache/pulsar/client/api/TopicReaderTest.java | 107 +- .../java/org/apache/pulsar/client/api/Reader.java | 10 + .../org/apache/pulsar/client/impl/ClientCnx.java | 37 + .../apache/pulsar/client/impl/ConsumerImpl.java| 131 ++- .../org/apache/pulsar/client/impl/ReaderImpl.java | 11 +- .../pulsar/client/util/ExecutorProvider.java |2 +- .../org/apache/pulsar/common/api/Commands.java | 41 +- .../apache/pulsar/common/api/PulsarDecoder.java| 20 + .../apache/pulsar/common/api/proto/PulsarApi.java | 1026 pulsar-common/src/main/proto/PulsarApi.proto | 47 +- 16 files changed, 1457 insertions(+), 57 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index e13664c..9149bb9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -334,4 +334,11 @@ public interface ManagedLedger { * @param config */ void setConfig(ManagedLedgerConfig config); + +/** + * Gets last confirmed entry of the managed ledger. + * + * @return the last confirmed entry id + */ +Position getLastConfirmedEntry(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 80a0bbe..89d3476 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1062,7 +1062,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { Futures.waitForAll(futures).thenRun(() -> { callback.closeComplete(ctx); }).exceptionally(exception -> { - callback.closeFailed(getManagedLedgerException(exception.getCause()), ctx); + callback.closeFailed(ManagedLedgerException.getManagedLedgerException(exception.getCause()), ctx); return null; }); } @@ -1282,7 +1282,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { }).exceptionally(ex -> { log.error("[{}] Error opening ledger for reading at position {} - {}", name, opReadEntry.readPosition, ex.getMessage()); - opReadEntry.readEntriesFailed(getManagedLedgerException(ex.getCause()), opReadEntry.ctx); + opReadEntry.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), opReadEntry.ctx); return null; }); } @@ -1351,7 +1351,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { entryCache.asyncReadEntry(ledger, position, callback, ctx); }).exceptionally(ex -> { log.error("[{}] Error opening ledger for reading at position {} - {}", name, position, ex.getMessage()); - callback.readEntryFailed(getManagedLedgerException(ex.getCause()), ctx); + callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), ctx); return null; }); } @@ -2173,7 +2173,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { return pendingAddEntries.size(); } -public Posi
[GitHub] sijie commented on issue #1223: Add a `backend` admin restful endpoint for query backend information
sijie commented on issue #1223: Add a `backend` admin restful endpoint for query backend information URL: https://github.com/apache/incubator-pulsar/pull/1223#issuecomment-365475832 Okay will change it to InternalConfiguration This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] rdhabalia commented on issue #1223: Add a `backend` admin restful endpoint for query backend information
rdhabalia commented on issue #1223: Add a `backend` admin restful endpoint for query backend information URL: https://github.com/apache/incubator-pulsar/pull/1223#issuecomment-365474351 > the purpose of this PR is to provide a method for application to know which zk, bk that a pulsar cluster is using. InternalConfiguration/InternalData ? I think we can add more configurations other than zk in future as it can surface internal configuration. So, should can keep it a generic name eg: `InternalConfiguration`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Added debug logs in MessageCrypto (#1233)
This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new fbb42e7 Added debug logs in MessageCrypto (#1233) fbb42e7 is described below commit fbb42e711aa9044ec5a1b30f448173432c1d805a Author: Andrews AuthorDate: Tue Feb 13 18:12:13 2018 -0800 Added debug logs in MessageCrypto (#1233) --- pulsar-client-cpp/lib/MessageCrypto.cc | 101 + pulsar-client-cpp/lib/MessageCrypto.h | 1 + 2 files changed, 77 insertions(+), 25 deletions(-) diff --git a/pulsar-client-cpp/lib/MessageCrypto.cc b/pulsar-client-cpp/lib/MessageCrypto.cc index 26f1b48..0cc5dec 100644 --- a/pulsar-client-cpp/lib/MessageCrypto.cc +++ b/pulsar-client-cpp/lib/MessageCrypto.cc @@ -87,18 +87,18 @@ RSA* MessageCrypto::loadPrivateKey(std::string& privateKeyStr) { bool MessageCrypto::getDigest(const std::string& keyName, const void* input, unsigned int inputLen, unsigned char keyDigest[], unsigned int& digestLen) { if (EVP_DigestInit_ex(mdCtx_, EVP_md5(), NULL) != 1) { -LOG_ERROR(logCtx_ + "Failed to initialize md5 digest for key " + keyName); +LOG_ERROR(logCtx_ << "Failed to initialize md5 digest for key " << keyName); return false; } digestLen = 0; if (EVP_DigestUpdate(mdCtx_, input, inputLen) != 1) { -LOG_ERROR(logCtx_ + "Failed to get md5 hash for data key " + keyName); +LOG_ERROR(logCtx_ << "Failed to get md5 hash for data key " << keyName); return false; } if (EVP_DigestFinal_ex(mdCtx_, keyDigest, &digestLen) != 1) { -LOG_ERROR(logCtx_ + "Failed to finalize md hash for data key " + keyName); +LOG_ERROR(logCtx_ << "Failed to finalize md hash for data key " << keyName); return false; } @@ -122,6 +122,21 @@ void MessageCrypto::removeExpiredDataKey() { } } +std::string MessageCrypto::stringToHex(const std::string& inputStr, size_t len) { +static const char* hexVals = "0123456789ABCDEF"; + +std::string outHex; +outHex.reserve(2 * len + 2); +outHex.push_back('0'); +outHex.push_back('x'); +for (size_t i = 0; i < len; ++i) { +const unsigned char c = inputStr[i]; +outHex.push_back(hexVals[c >> 4]); +outHex.push_back(hexVals[c & 15]); +} +return outHex; +} + Result MessageCrypto::addPublicKeyCipher(std::set& keyNames, const CryptoKeyReaderPtr keyReader) { Lock lock(mutex_); @@ -141,7 +156,7 @@ Result MessageCrypto::addPublicKeyCipher(std::set& keyNames, Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const CryptoKeyReaderPtr keyReader) { if (keyName.empty()) { -LOG_ERROR(logCtx_ + "Keyname is empty "); +LOG_ERROR(logCtx_ << "Keyname is empty "); return ResultCryptoError; } @@ -150,15 +165,16 @@ Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const Crypt EncryptionKeyInfo keyInfo; Result result = keyReader->getPublicKey(keyName, keyMeta, keyInfo); if (result != ResultOk) { -LOG_ERROR(logCtx_ + "Failed to get public key from KeyReader for key " + keyName); +LOG_ERROR(logCtx_ << "Failed to get public key from KeyReader for key " << keyName); return result; } RSA* pubKey = loadPublicKey(keyInfo.getKey()); if (pubKey == NULL) { -LOG_ERROR(logCtx_ + "Failed to load public key " + keyName); +LOG_ERROR(logCtx_ << "Failed to load public key " << keyName); return ResultCryptoError; } +LOG_DEBUG(logCtx_ << " Public key " << keyName << " loaded successfully."); int inSize = RSA_size(pubKey); boost::scoped_array encryptedKey(new unsigned char[inSize]); @@ -167,7 +183,7 @@ Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const Crypt RSA_public_encrypt(dataKeyLen_, dataKey_.get(), encryptedKey.get(), pubKey, RSA_PKCS1_OAEP_PADDING); if (inSize != outSize) { -LOG_ERROR(logCtx_ + "Ciphertext is length not matching input key length for key " + keyName); +LOG_ERROR(logCtx_ << "Ciphertext is length not matching input key length for key " << keyName); return ResultCryptoError; } std::string encryptedKeyStr(reinterpret_cast(encryptedKey.get()), inSize); @@ -176,6 +192,11 @@ Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const Crypt eki->setMetadata(keyInfo.getMetadata()); encryptedDataKeyMap_.insert(std::make_pair(keyName, eki)); +if (LOG4CXX_UNLIKELY(logger()->isDebugEnabled())) { +std::string strHex = stringToHex(encryptedKeyStr, encryptedKeyStr.size()); +LOG_DEBUG(logCtx_ << " Data key encrypted for key " << keyName + <<
[GitHub] rdhabalia closed pull request #1233: Added debug logs in MessageCrypto
rdhabalia closed pull request #1233: Added debug logs in MessageCrypto URL: https://github.com/apache/incubator-pulsar/pull/1233 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-cpp/lib/MessageCrypto.cc b/pulsar-client-cpp/lib/MessageCrypto.cc index 26f1b484e..0cc5decac 100644 --- a/pulsar-client-cpp/lib/MessageCrypto.cc +++ b/pulsar-client-cpp/lib/MessageCrypto.cc @@ -87,18 +87,18 @@ RSA* MessageCrypto::loadPrivateKey(std::string& privateKeyStr) { bool MessageCrypto::getDigest(const std::string& keyName, const void* input, unsigned int inputLen, unsigned char keyDigest[], unsigned int& digestLen) { if (EVP_DigestInit_ex(mdCtx_, EVP_md5(), NULL) != 1) { -LOG_ERROR(logCtx_ + "Failed to initialize md5 digest for key " + keyName); +LOG_ERROR(logCtx_ << "Failed to initialize md5 digest for key " << keyName); return false; } digestLen = 0; if (EVP_DigestUpdate(mdCtx_, input, inputLen) != 1) { -LOG_ERROR(logCtx_ + "Failed to get md5 hash for data key " + keyName); +LOG_ERROR(logCtx_ << "Failed to get md5 hash for data key " << keyName); return false; } if (EVP_DigestFinal_ex(mdCtx_, keyDigest, &digestLen) != 1) { -LOG_ERROR(logCtx_ + "Failed to finalize md hash for data key " + keyName); +LOG_ERROR(logCtx_ << "Failed to finalize md hash for data key " << keyName); return false; } @@ -122,6 +122,21 @@ void MessageCrypto::removeExpiredDataKey() { } } +std::string MessageCrypto::stringToHex(const std::string& inputStr, size_t len) { +static const char* hexVals = "0123456789ABCDEF"; + +std::string outHex; +outHex.reserve(2 * len + 2); +outHex.push_back('0'); +outHex.push_back('x'); +for (size_t i = 0; i < len; ++i) { +const unsigned char c = inputStr[i]; +outHex.push_back(hexVals[c >> 4]); +outHex.push_back(hexVals[c & 15]); +} +return outHex; +} + Result MessageCrypto::addPublicKeyCipher(std::set& keyNames, const CryptoKeyReaderPtr keyReader) { Lock lock(mutex_); @@ -141,7 +156,7 @@ Result MessageCrypto::addPublicKeyCipher(std::set& keyNames, Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const CryptoKeyReaderPtr keyReader) { if (keyName.empty()) { -LOG_ERROR(logCtx_ + "Keyname is empty "); +LOG_ERROR(logCtx_ << "Keyname is empty "); return ResultCryptoError; } @@ -150,15 +165,16 @@ Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const Crypt EncryptionKeyInfo keyInfo; Result result = keyReader->getPublicKey(keyName, keyMeta, keyInfo); if (result != ResultOk) { -LOG_ERROR(logCtx_ + "Failed to get public key from KeyReader for key " + keyName); +LOG_ERROR(logCtx_ << "Failed to get public key from KeyReader for key " << keyName); return result; } RSA* pubKey = loadPublicKey(keyInfo.getKey()); if (pubKey == NULL) { -LOG_ERROR(logCtx_ + "Failed to load public key " + keyName); +LOG_ERROR(logCtx_ << "Failed to load public key " << keyName); return ResultCryptoError; } +LOG_DEBUG(logCtx_ << " Public key " << keyName << " loaded successfully."); int inSize = RSA_size(pubKey); boost::scoped_array encryptedKey(new unsigned char[inSize]); @@ -167,7 +183,7 @@ Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const Crypt RSA_public_encrypt(dataKeyLen_, dataKey_.get(), encryptedKey.get(), pubKey, RSA_PKCS1_OAEP_PADDING); if (inSize != outSize) { -LOG_ERROR(logCtx_ + "Ciphertext is length not matching input key length for key " + keyName); +LOG_ERROR(logCtx_ << "Ciphertext is length not matching input key length for key " << keyName); return ResultCryptoError; } std::string encryptedKeyStr(reinterpret_cast(encryptedKey.get()), inSize); @@ -176,6 +192,11 @@ Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const Crypt eki->setMetadata(keyInfo.getMetadata()); encryptedDataKeyMap_.insert(std::make_pair(keyName, eki)); +if (LOG4CXX_UNLIKELY(logger()->isDebugEnabled())) { +std::string strHex = stringToHex(encryptedKeyStr, encryptedKeyStr.size()); +LOG_DEBUG(logCtx_ << " Data key encrypted for key " << keyName + << ". Encrypted key size = " << encryptedKeyStr.size() << ", value = " << strHex); +} return ResultOk; } @@ -212,7 +233,7 @@ bool MessageCrypto::encrypt(std::set& encKeys, const CryptoKeyReade keyInfoIter = encryptedDataKeyMap_.find(keyName);
[GitHub] rdhabalia opened a new pull request #1235: Add non-persistent topic stats separately in brokers-stat
rdhabalia opened a new pull request #1235: Add non-persistent topic stats separately in brokers-stat URL: https://github.com/apache/incubator-pulsar/pull/1235 ### Motivation Right now, broker-stats gives persistent/non-persistent topic-stats under the `persistent` topic section. We need them in separate section to manage them operationally. ### Modifications Add non-persistent topic under separate section. ### Result Broker-stats will show non-persistent topic in separate section. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
zhaijack commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r168061308 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ## @@ -1248,6 +1256,102 @@ public void seek(MessageId messageId) throws PulsarClientException { return seekFuture; } +public boolean hasMessageAvailable() throws PulsarClientException { +try { +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +return true; +} + +return hasMessageAvailableAsync().get(); +} catch (ExecutionException | InterruptedException e) { +throw new PulsarClientException(e); +} +} + +public CompletableFuture hasMessageAvailableAsync() { +final CompletableFuture booleanFuture = new CompletableFuture<>(); + +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +booleanFuture.complete(true); +} else { +getLastMessageIdAsync().thenAccept(messageId -> { +lastMessageIdInBroker = messageId; +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +booleanFuture.complete(true); +} else { +booleanFuture.complete(false); +} +}).exceptionally(e -> { +log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); +booleanFuture.completeExceptionally(e.getCause()); +return null; +}); +} +return booleanFuture; +} + +private CompletableFuture getLastMessageIdAsync() { +if (getState() == State.Closing || getState() == State.Closed) { +return FutureUtil +.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed")); +} + +AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs()); +Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS, +opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS, +0 , TimeUnit.MILLISECONDS); +CompletableFuture getLastMessageIdFuture = new CompletableFuture<>(); + +internalGetLastMessageIdAsync(backoff, opTimeoutMs, getLastMessageIdFuture); +return getLastMessageIdFuture; +} + +private void internalGetLastMessageIdAsync(final Backoff backoff, + final AtomicLong remainingTime, + CompletableFuture future) { +if (isConnected()) { Review comment: Thanks, will change it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r168059642 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ## @@ -1248,6 +1256,102 @@ public void seek(MessageId messageId) throws PulsarClientException { return seekFuture; } +public boolean hasMessageAvailable() throws PulsarClientException { +try { +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +return true; +} + +return hasMessageAvailableAsync().get(); +} catch (ExecutionException | InterruptedException e) { +throw new PulsarClientException(e); +} +} + +public CompletableFuture hasMessageAvailableAsync() { +final CompletableFuture booleanFuture = new CompletableFuture<>(); + +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +booleanFuture.complete(true); +} else { +getLastMessageIdAsync().thenAccept(messageId -> { +lastMessageIdInBroker = messageId; +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +booleanFuture.complete(true); +} else { +booleanFuture.complete(false); +} +}).exceptionally(e -> { +log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); +booleanFuture.completeExceptionally(e.getCause()); +return null; +}); +} +return booleanFuture; +} + +private CompletableFuture getLastMessageIdAsync() { +if (getState() == State.Closing || getState() == State.Closed) { +return FutureUtil +.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed")); +} + +AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs()); +Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS, +opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS, +0 , TimeUnit.MILLISECONDS); +CompletableFuture getLastMessageIdFuture = new CompletableFuture<>(); + +internalGetLastMessageIdAsync(backoff, opTimeoutMs, getLastMessageIdFuture); +return getLastMessageIdFuture; +} + +private void internalGetLastMessageIdAsync(final Backoff backoff, + final AtomicLong remainingTime, + CompletableFuture future) { +if (isConnected()) { Review comment: I missed this before, but we should make sure the connection doesn't change while we're executing this method. `isConnected()` is checking the current connection, but that might change when we ask for `cnx()` few lines below. We need to first get a reference on `ClientCnx` and use that throughout the method. ```java ClientCnx cnx = cnx(); if (cnx != null) { // check cnx.getRemoteEndpointProtocolVersion(); cnx.sendGetLastMessageId()... } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r168054861 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ## @@ -145,6 +151,9 @@ this.batchMessageAckTracker = new ConcurrentSkipListMap<>(); this.readCompacted = conf.getReadCompacted(); +this.getLastIdExecutor = Executors Review comment: This would create 1 thread per each consumer, we should reuse the executor that is already available from `PulsarClientImpl` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r168054738 ## File path: pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java ## @@ -952,4 +952,8 @@ public static ByteBuf newLookup(String topic, boolean authoritative, String orig lookupBroker.recycle(); return res; } + +public static boolean peerSupportsGetLastMessageId() { +return getCurrentProtocolVersion() >= ProtocolVersion.v12.getNumber(); Review comment: This is just checking our own protocol version (which is always the "latest" from when we compiled the protobuf), though we need to check the other side version in this case the broker. this should be like : ```java public static boolean peerSupportsGetLastMessageId(ClientCnx cnx) { return ctx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v12.getNumber(); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hrsakai opened a new pull request #1234: Make max clients per topic/subscription configurable
hrsakai opened a new pull request #1234: Make max clients per topic/subscription configurable URL: https://github.com/apache/incubator-pulsar/pull/1234 ### Motivation Currently, max clients is not limited by broker. Therefore, if client implementation is incorrect, clients may grow infinitely. ### Modifications * Add max producers per topic settings to `ServerConfiguration` * Add max consumers per topic/subscription settings to `ServerConfiguration` ### Result We can set max number of clients and prevent clients from growing infinitely. Next, we plan to enable namespaces to have custom values by using `REST API` interface. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] saandrews opened a new pull request #1233: Added debug logs in MessageCrypto
saandrews opened a new pull request #1233: Added debug logs in MessageCrypto URL: https://github.com/apache/incubator-pulsar/pull/1233 Added additional logging to help debugging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
zhaijack commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r168036825 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ## @@ -1248,6 +1254,103 @@ public void seek(MessageId messageId) throws PulsarClientException { return seekFuture; } +public boolean hasMessageAvailable() throws PulsarClientException { +try { +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +return true; +} + +return hasMessageAvailableAsync().get(); +} catch (ExecutionException | InterruptedException e) { +throw new PulsarClientException(e); +} +} + +public CompletableFuture hasMessageAvailableAsync() { +final CompletableFuture booleanFuture = new CompletableFuture<>(); + +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +booleanFuture.complete(true); +} else { +getLastMessageIdAsync().thenAccept(messageId -> { +lastMessageIdInBroker = messageId; +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +booleanFuture.complete(true); +} else { +booleanFuture.complete(false); +} +}).exceptionally(e -> { +log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); +booleanFuture.completeExceptionally(e.getCause()); +return null; +}); +} +return booleanFuture; +} + +private CompletableFuture getLastMessageIdAsync() { +if (getState() == State.Closing || getState() == State.Closed) { +return FutureUtil +.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed")); +} + +if (!isConnected()) { +long opTimeoutMs = client.getConfiguration().getOperationTimeoutMs(); Review comment: Thanks, will change this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r168005589 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ## @@ -1248,6 +1254,103 @@ public void seek(MessageId messageId) throws PulsarClientException { return seekFuture; } +public boolean hasMessageAvailable() throws PulsarClientException { +try { +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +return true; +} + +return hasMessageAvailableAsync().get(); +} catch (ExecutionException | InterruptedException e) { +throw new PulsarClientException(e); +} +} + +public CompletableFuture hasMessageAvailableAsync() { +final CompletableFuture booleanFuture = new CompletableFuture<>(); + +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +booleanFuture.complete(true); +} else { +getLastMessageIdAsync().thenAccept(messageId -> { +lastMessageIdInBroker = messageId; +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +booleanFuture.complete(true); +} else { +booleanFuture.complete(false); +} +}).exceptionally(e -> { +log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); +booleanFuture.completeExceptionally(e.getCause()); +return null; +}); +} +return booleanFuture; +} + +private CompletableFuture getLastMessageIdAsync() { +if (getState() == State.Closing || getState() == State.Closed) { +return FutureUtil +.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed")); +} + +if (!isConnected()) { +long opTimeoutMs = client.getConfiguration().getOperationTimeoutMs(); Review comment: This implementation will block the caller of an asynchrounous method, which might be unexpected. One way to do the retries would be to have an internal method that gets called asynchrounously in recursion. For example, something like : ```java private CompletableFuture internalGetLastMessageIdAsync(Backoff backoff, long remainingTime) { if (connected) { // write on socket and return future } else { // if time is not elapsed yet... long nextDelay = backoff.next(); executor.schedule(() -> { remainingTime -= (timeSpentSinceLastCall); internalGetLastMessageIdAsync(backoff, remainingTime); }, nextDelay, TimeUnit.MILLISECONDS); } } ``` (Don't read too much in the previous example, I'm just trying to illustrate the basic idea) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
merlimat commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r168006084 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ## @@ -1248,6 +1254,103 @@ public void seek(MessageId messageId) throws PulsarClientException { return seekFuture; } +public boolean hasMessageAvailable() throws PulsarClientException { +try { +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +return true; +} + +return hasMessageAvailableAsync().get(); +} catch (ExecutionException | InterruptedException e) { +throw new PulsarClientException(e); +} +} + +public CompletableFuture hasMessageAvailableAsync() { +final CompletableFuture booleanFuture = new CompletableFuture<>(); + +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +booleanFuture.complete(true); +} else { +getLastMessageIdAsync().thenAccept(messageId -> { +lastMessageIdInBroker = messageId; +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && +((MessageIdImpl)lastMessageIdInBroker).getEntryId() != -1) { +booleanFuture.complete(true); +} else { +booleanFuture.complete(false); +} +}).exceptionally(e -> { +log.error("[{}][{}] Failed getLastMessageId command", topic, subscription); +booleanFuture.completeExceptionally(e.getCause()); +return null; +}); +} +return booleanFuture; +} + +private CompletableFuture getLastMessageIdAsync() { +if (getState() == State.Closing || getState() == State.Closed) { +return FutureUtil +.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed")); +} + +if (!isConnected()) { +long opTimeoutMs = client.getConfiguration().getOperationTimeoutMs(); +Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS, +opTimeoutMs * 2, TimeUnit.MILLISECONDS, +0 , TimeUnit.MILLISECONDS); + +long delayMs = backoff.firstBackoffTimeInMillis;; +while (delayMs < opTimeoutMs && !isConnected()); { +log.warn("[{}] [{}] Could not get connection while getLastMessageId -- Will try again in {} ms", +topic, getHandlerName(), delayMs); +try { +Thread.sleep(delayMs); +} catch (InterruptedException e) { +return FutureUtil +.failedFuture(new PulsarClientException +.ConnectException("InterruptedException, could not connect")); +} +delayMs = backoff.next(); +} + +if (!isConnected()) { +return FutureUtil.failedFuture(new PulsarClientException("Not connected to broker")); +} +} + +if (cnx().getRemoteEndpointProtocolVersion() < ProtocolVersion.v12.getNumber()) { Review comment: Nit: please wrap this in a method like : ```java void Commands.peerSupportsGetLastMessageId(ClientCnx cnx); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] mgodave commented on a change in pull request #1232: Schema registry (1/4)
mgodave commented on a change in pull request #1232: Schema registry (1/4) URL: https://github.com/apache/incubator-pulsar/pull/1232#discussion_r167996411 ## File path: pulsar-common/src/main/proto/PulsarApi.proto ## @@ -272,6 +289,8 @@ message CommandProducer { /// Add optional metadata key=value to this producer repeated KeyValue metadata= 6; + + optional int64 schema_version = 7; Review comment: This seems to be an oversight on my part. Since this series of changes is concerned with a 'repository' I appear to have glossed over this point. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] mgodave commented on a change in pull request #1232: Schema registry (1/4)
mgodave commented on a change in pull request #1232: Schema registry (1/4) URL: https://github.com/apache/incubator-pulsar/pull/1232#discussion_r167996154 ## File path: pulsar-common/src/main/proto/PulsarApi.proto ## @@ -22,6 +22,20 @@ package pulsar.proto; option java_package = "org.apache.pulsar.common.api.proto"; option optimize_for = LITE_RUNTIME; +message Schema { + required string name = 1; + required bytes version = 2; + required bytes schema_data = 7; +repeated KeyValue properties = 8; +} Review comment: I took a comment from the PIP email thread regarding the arbitrariness of the fields chosen in the original proposal. It occurred to me that schema type is equally as arbitrary. First, adding a new type would require a binary protocol change, a new cut release, and a coordinated client/server deployment. By removing the schema as a hard coded field we're making the choice of how to identify it an end-to-end problem. If, as you commented above, we need the client to send the actual schema (which is an oversight on my part), then we only need to compare the schemas. If we need to compare them semantically then we can devise a server side plugin scheme to allow us to identify and compare "like" versions ("is this avro schema 'compatible' with this other avro schema?" for instance). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] mgodave commented on a change in pull request #1232: Schema registry (1/4)
mgodave commented on a change in pull request #1232: Schema registry (1/4) URL: https://github.com/apache/incubator-pulsar/pull/1232#discussion_r167994769 ## File path: pulsar-common/src/main/proto/PulsarApi.proto ## @@ -22,6 +22,20 @@ package pulsar.proto; option java_package = "org.apache.pulsar.common.api.proto"; option optimize_for = LITE_RUNTIME; +message Schema { + required string name = 1; + required bytes version = 2; + required bytes schema_data = 7; Review comment: Yeah definitely, I deleted a few fields and didn't readjust the numbering. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[incubator-pulsar] branch master updated: Bumped master to 2.0.0-incubating-SNAPSHOT (#1226)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new e77faf4 Bumped master to 2.0.0-incubating-SNAPSHOT (#1226) e77faf4 is described below commit e77faf4acf760447850bd2e4c99acae24d2ae27c Author: Matteo Merli AuthorDate: Tue Feb 13 12:08:58 2018 -0800 Bumped master to 2.0.0-incubating-SNAPSHOT (#1226) --- all/pom.xml | 2 +- buildtools/pom.xml | 2 +- managed-ledger/pom.xml | 2 +- pom.xml | 2 +- pulsar-broker-auth-athenz/pom.xml| 2 +- pulsar-broker-common/pom.xml | 2 +- pulsar-broker-shaded/pom.xml | 2 +- pulsar-broker/pom.xml| 2 +- pulsar-checksum/pom.xml | 2 +- pulsar-client-admin-shaded/pom.xml | 2 +- pulsar-client-admin/pom.xml | 2 +- pulsar-client-auth-athenz/pom.xml| 2 +- pulsar-client-kafka-compat/pom.xml | 2 +- pulsar-client-kafka-compat/pulsar-client-kafka-tests/pom.xml | 2 +- pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml | 2 +- pulsar-client-shaded/pom.xml | 2 +- pulsar-client-tools/pom.xml | 2 +- pulsar-client/pom.xml| 2 +- pulsar-common/pom.xml| 2 +- pulsar-discovery-service/pom.xml | 2 +- pulsar-proxy/pom.xml | 2 +- pulsar-spark/pom.xml | 2 +- pulsar-storm/pom.xml | 2 +- pulsar-testclient/pom.xml| 2 +- pulsar-websocket/pom.xml | 2 +- pulsar-zookeeper-utils/pom.xml | 2 +- pulsar-zookeeper/pom.xml | 2 +- 27 files changed, 27 insertions(+), 27 deletions(-) diff --git a/all/pom.xml b/all/pom.xml index d7c2c98..deddfac 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -25,7 +25,7 @@ org.apache.pulsar pulsar -1.22.0-incubating-SNAPSHOT +2.0.0-incubating-SNAPSHOT .. diff --git a/buildtools/pom.xml b/buildtools/pom.xml index fc75fe7..d561bfb 100644 --- a/buildtools/pom.xml +++ b/buildtools/pom.xml @@ -25,7 +25,7 @@ org.apache.pulsar pulsar -1.22.0-incubating-SNAPSHOT +2.0.0-incubating-SNAPSHOT .. diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml index 34a0792..d3570c6 100644 --- a/managed-ledger/pom.xml +++ b/managed-ledger/pom.xml @@ -25,7 +25,7 @@ org.apache.pulsar pulsar -1.22.0-incubating-SNAPSHOT +2.0.0-incubating-SNAPSHOT .. diff --git a/pom.xml b/pom.xml index 7320661..192769f 100644 --- a/pom.xml +++ b/pom.xml @@ -33,7 +33,7 @@ org.apache.pulsar pulsar - 1.22.0-incubating-SNAPSHOT + 2.0.0-incubating-SNAPSHOT Pulsar Pulsar is a distributed pub-sub messaging platform with a very diff --git a/pulsar-broker-auth-athenz/pom.xml b/pulsar-broker-auth-athenz/pom.xml index 80c22be..48e78e8 100644 --- a/pulsar-broker-auth-athenz/pom.xml +++ b/pulsar-broker-auth-athenz/pom.xml @@ -26,7 +26,7 @@ org.apache.pulsar pulsar -1.22.0-incubating-SNAPSHOT +2.0.0-incubating-SNAPSHOT pulsar-broker-auth-athenz diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml index 7a1d356..666fec8 100644 --- a/pulsar-broker-common/pom.xml +++ b/pulsar-broker-common/pom.xml @@ -26,7 +26,7 @@ org.apache.pulsar pulsar -1.22.0-incubating-SNAPSHOT +2.0.0-incubating-SNAPSHOT pulsar-broker-common diff --git a/pulsar-broker-shaded/pom.xml b/pulsar-broker-shaded/pom.xml index bda3037..efcfa76 100644 --- a/pulsar-broker-shaded/pom.xml +++ b/pulsar-broker-shaded/pom.xml @@ -26,7 +26,7 @@ org.apache.pulsar pulsar -1.22.0-incubating-SNAPSHOT +2.0.0-incubating-SNAPSHOT .. diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml index 9a703e0..6cd1f89 100644 --- a/pulsar-broker/pom.xml +++ b/pulsar-broker/pom.xml @@ -25,7 +25,7 @@ org.apache.pulsar pulsar -1.22.0-incubating-SNAPSHOT +2.0.0-incubating-SNAPSHOT .. diff --git a/pulsar-checksum/pom.xml b/pulsar-checksum/pom.xml index da4f778..fb61c6f 100644 --- a/pulsar-checksum/pom.xml +++ b/pulsar-checksum/pom.xml @@ -26,7 +26,7 @@ org.apache.pulsar pulsar -1.22.0-incubating-SNA
[GitHub] merlimat closed pull request #1226: Bumped master to 2.0.0-incubating-SNAPSHOT
merlimat closed pull request #1226: Bumped master to 2.0.0-incubating-SNAPSHOT URL: https://github.com/apache/incubator-pulsar/pull/1226 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/all/pom.xml b/all/pom.xml index d7c2c9854..deddfacdf 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -25,7 +25,7 @@ org.apache.pulsar pulsar -1.22.0-incubating-SNAPSHOT +2.0.0-incubating-SNAPSHOT .. diff --git a/buildtools/pom.xml b/buildtools/pom.xml index fc75fe739..d561bfbb1 100644 --- a/buildtools/pom.xml +++ b/buildtools/pom.xml @@ -25,7 +25,7 @@ org.apache.pulsar pulsar -1.22.0-incubating-SNAPSHOT +2.0.0-incubating-SNAPSHOT .. diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml index 34a0792d7..d3570c6db 100644 --- a/managed-ledger/pom.xml +++ b/managed-ledger/pom.xml @@ -25,7 +25,7 @@ org.apache.pulsar pulsar -1.22.0-incubating-SNAPSHOT +2.0.0-incubating-SNAPSHOT .. diff --git a/pom.xml b/pom.xml index 7320661d5..192769fcf 100644 --- a/pom.xml +++ b/pom.xml @@ -33,7 +33,7 @@ org.apache.pulsar pulsar - 1.22.0-incubating-SNAPSHOT + 2.0.0-incubating-SNAPSHOT Pulsar Pulsar is a distributed pub-sub messaging platform with a very diff --git a/pulsar-broker-auth-athenz/pom.xml b/pulsar-broker-auth-athenz/pom.xml index 80c22be81..48e78e87a 100644 --- a/pulsar-broker-auth-athenz/pom.xml +++ b/pulsar-broker-auth-athenz/pom.xml @@ -26,7 +26,7 @@ org.apache.pulsar pulsar -1.22.0-incubating-SNAPSHOT +2.0.0-incubating-SNAPSHOT pulsar-broker-auth-athenz diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml index 7a1d35606..666fec85a 100644 --- a/pulsar-broker-common/pom.xml +++ b/pulsar-broker-common/pom.xml @@ -26,7 +26,7 @@ org.apache.pulsar pulsar -1.22.0-incubating-SNAPSHOT +2.0.0-incubating-SNAPSHOT pulsar-broker-common diff --git a/pulsar-broker-shaded/pom.xml b/pulsar-broker-shaded/pom.xml index bda3037eb..efcfa76dc 100644 --- a/pulsar-broker-shaded/pom.xml +++ b/pulsar-broker-shaded/pom.xml @@ -26,7 +26,7 @@ org.apache.pulsar pulsar -1.22.0-incubating-SNAPSHOT +2.0.0-incubating-SNAPSHOT .. diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml index 9a703e0d0..6cd1f8957 100644 --- a/pulsar-broker/pom.xml +++ b/pulsar-broker/pom.xml @@ -25,7 +25,7 @@ org.apache.pulsar pulsar -1.22.0-incubating-SNAPSHOT +2.0.0-incubating-SNAPSHOT .. diff --git a/pulsar-checksum/pom.xml b/pulsar-checksum/pom.xml index da4f7785f..fb61c6fb8 100644 --- a/pulsar-checksum/pom.xml +++ b/pulsar-checksum/pom.xml @@ -26,7 +26,7 @@ org.apache.pulsar pulsar -1.22.0-incubating-SNAPSHOT +2.0.0-incubating-SNAPSHOT .. diff --git a/pulsar-client-admin-shaded/pom.xml b/pulsar-client-admin-shaded/pom.xml index 6a2f406fc..631012175 100644 --- a/pulsar-client-admin-shaded/pom.xml +++ b/pulsar-client-admin-shaded/pom.xml @@ -26,7 +26,7 @@ org.apache.pulsar pulsar -1.22.0-incubating-SNAPSHOT +2.0.0-incubating-SNAPSHOT .. diff --git a/pulsar-client-admin/pom.xml b/pulsar-client-admin/pom.xml index 1751af3ce..0cfc9dc49 100644 --- a/pulsar-client-admin/pom.xml +++ b/pulsar-client-admin/pom.xml @@ -26,7 +26,7 @@ org.apache.pulsar pulsar -1.22.0-incubating-SNAPSHOT +2.0.0-incubating-SNAPSHOT .. diff --git a/pulsar-client-auth-athenz/pom.xml b/pulsar-client-auth-athenz/pom.xml index 2b4406181..e7df0fc48 100644 --- a/pulsar-client-auth-athenz/pom.xml +++ b/pulsar-client-auth-athenz/pom.xml @@ -25,7 +25,7 @@ org.apache.pulsar pulsar -1.22.0-incubating-SNAPSHOT +2.0.0-incubating-SNAPSHOT .. diff --git a/pulsar-client-kafka-compat/pom.xml b/pulsar-client-kafka-compat/pom.xml index 5a31a43df..e43d94269 100644 --- a/pulsar-client-kafka-compat/pom.xml +++ b/pulsar-client-kafka-compat/pom.xml @@ -27,7 +27,7 @@ org.apache.pulsar pulsar -1.22.0-incubating-SNAPSHOT +2.0.0-incubating-SNAPSHOT .. diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/pom.xml index 5b1d3d36b..6e546156d 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/pom.xml +++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/pom.xml @@ -27,7 +27,7 @@ org.apache.pulsar pulsar-client-kafka-compat -1.22.0-incubating-SNAPSHOT +2.0.0-incubating-SNAPSHOT .. diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka/pom.xml index 7212c3936..906000372 100644 --- a/p
[GitHub] ivankelly commented on issue #1231: Read from compacted topic ledger if available and enabled
ivankelly commented on issue #1231: Read from compacted topic ledger if available and enabled URL: https://github.com/apache/incubator-pulsar/pull/1231#issuecomment-365368533 I need to fixup tests on this, I think some mocks are missing implementations This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1232: Schema registry (1/4)
merlimat commented on a change in pull request #1232: Schema registry (1/4) URL: https://github.com/apache/incubator-pulsar/pull/1232#discussion_r167958385 ## File path: pulsar-common/src/main/proto/PulsarApi.proto ## @@ -22,6 +22,20 @@ package pulsar.proto; option java_package = "org.apache.pulsar.common.api.proto"; option optimize_for = LITE_RUNTIME; +message Schema { + required string name = 1; + required bytes version = 2; + required bytes schema_data = 7; Review comment: We should probably use 3 & 4 here to avoid implying we have retired some fields This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1232: Schema registry (1/4)
merlimat commented on a change in pull request #1232: Schema registry (1/4) URL: https://github.com/apache/incubator-pulsar/pull/1232#discussion_r167959766 ## File path: pulsar-common/src/main/proto/PulsarApi.proto ## @@ -22,6 +22,20 @@ package pulsar.proto; option java_package = "org.apache.pulsar.common.api.proto"; option optimize_for = LITE_RUNTIME; +message Schema { + required string name = 1; + required bytes version = 2; + required bytes schema_data = 7; +repeated KeyValue properties = 8; +} Review comment: How are we identifying the schema type? (eg: Json vs Avro vs..) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on a change in pull request #1232: Schema registry (1/4)
merlimat commented on a change in pull request #1232: Schema registry (1/4) URL: https://github.com/apache/incubator-pulsar/pull/1232#discussion_r167959515 ## File path: pulsar-common/src/main/proto/PulsarApi.proto ## @@ -272,6 +289,8 @@ message CommandProducer { /// Add optional metadata key=value to this producer repeated KeyValue metadata= 6; + + optional int64 schema_version = 7; Review comment: I think that when creating producer & consumer we need to specify the actual schema so that we can validate on broker side, and if that fails, the producer creation fails. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat closed pull request #1230: ProxyForwardAuthDataTest shouldn't reuse pulsar client
merlimat closed pull request #1230: ProxyForwardAuthDataTest shouldn't reuse pulsar client URL: https://github.com/apache/incubator-pulsar/pull/1230 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java index 2309ebbf3..d4b8e4d86 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java @@ -25,7 +25,6 @@ import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerConfiguration; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; @@ -48,7 +47,7 @@ private static final Logger log = LoggerFactory.getLogger(ProxyForwardAuthDataTest.class); private int webServicePort; private int servicePort; - + @BeforeMethod @Override protected void setup() throws Exception { @@ -60,11 +59,11 @@ protected void setup() throws Exception { conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName()); conf.setBrokerClientAuthenticationParameters("authParam:broker"); conf.setAuthenticateOriginalAuthData(true); - + Set superUserRoles = new HashSet(); superUserRoles.add("admin"); conf.setSuperUserRoles(superUserRoles); - + Set providers = new HashSet(); providers.add(BasicAuthenticationProvider.class.getName()); conf.setAuthenticationProviders(providers); @@ -79,9 +78,9 @@ protected void setup() throws Exception { @Override protected void cleanup() throws Exception { -super.internalCleanup(); +super.internalCleanup(); } - + @Test void testForwardAuthData() throws Exception { log.info("-- Starting {} test --", methodName); @@ -95,15 +94,13 @@ void testForwardAuthData() throws Exception { String subscriptionName = "my-subscriber-name"; String clientAuthParams = "authParam:client"; String proxyAuthParams = "authParam:proxy"; - + admin.properties().createProperty("my-property", new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use"))); admin.namespaces().createNamespace(namespaceName); - admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy", Sets.newHashSet(AuthAction.consume, AuthAction.produce)); admin.namespaces().grantPermissionOnNamespace(namespaceName, "client", Sets.newHashSet(AuthAction.consume, AuthAction.produce)); - // Step 2: Run Pulsar Proxy without forwarding authData - expect Exception ProxyConfiguration proxyConfig = new ProxyConfiguration(); proxyConfig.setAuthenticationEnabled(true); @@ -111,35 +108,29 @@ void testForwardAuthData() throws Exception { proxyConfig.setServicePort(servicePort); proxyConfig.setWebServicePort(webServicePort); proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT); - proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName()); proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams); Set providers = new HashSet<>(); providers.add(BasicAuthenticationProvider.class.getName()); proxyConfig.setAuthenticationProviders(providers); -ProxyService proxyService = new ProxyService(proxyConfig); - -proxyService.start(); -PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams); -Consumer consumer; -boolean exceptionOccured = false; -try { -consumer = proxyClient.subscribe(topicName, subscriptionName); -} catch(Exception ex) { -exceptionOccured = true; -} -Assert.assertTrue(exceptionOccured); -proxyService.close(); - + +try (ProxyService proxyService = new ProxyService(proxyConfig); + PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams)) { +proxyService.start(); +proxyClient.subscribe(topicName, subscriptionName); +Assert.fail("Shouldn't be able to subscribe, auth required"); +} catch (PulsarClientException.AuthorizationException e) { +// expected behaviour +} + // Step 3: Create proxy with fo
[incubator-pulsar] branch master updated: ProxyForwardAuthDataTest shouldn't reuse pulsar client (#1230)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 56ebe36 ProxyForwardAuthDataTest shouldn't reuse pulsar client (#1230) 56ebe36 is described below commit 56ebe368ec2eb8569218ca4b438ee53f2bf5145e Author: Ivan Kelly AuthorDate: Tue Feb 13 18:54:29 2018 +0100 ProxyForwardAuthDataTest shouldn't reuse pulsar client (#1230) * ProxyForwardAuthDataTest shouldn't reuse pulsar client This test was flaking due to the pulsar client being reused. The test starts a proxy service, subscribes to a topic, stops the proxy, starts a new proxy and tries to subscribe again. If using the same client for both connections, the client will have an open connection to the first proxy, which, when the proxy is stopped, will be closed by netty asynchronously. This can race with the second subscription attempt, and cause it to fail with ConnectionClosedException. Specifically, if the subscription attempt runs before the netty callback runs, the subscription attempt will try to subscribe on a dead connection. The immediate fix for this test, to stop the flaking, is just to use a different client for each attempt. * Remove trailing whitespace --- .../proxy/server/ProxyForwardAuthDataTest.java | 55 +- 1 file changed, 23 insertions(+), 32 deletions(-) diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java index 2309ebb..d4b8e4d 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java @@ -25,7 +25,6 @@ import java.util.Set; import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerConfiguration; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; @@ -48,7 +47,7 @@ public class ProxyForwardAuthDataTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(ProxyForwardAuthDataTest.class); private int webServicePort; private int servicePort; - + @BeforeMethod @Override protected void setup() throws Exception { @@ -60,11 +59,11 @@ public class ProxyForwardAuthDataTest extends ProducerConsumerBase { conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName()); conf.setBrokerClientAuthenticationParameters("authParam:broker"); conf.setAuthenticateOriginalAuthData(true); - + Set superUserRoles = new HashSet(); superUserRoles.add("admin"); conf.setSuperUserRoles(superUserRoles); - + Set providers = new HashSet(); providers.add(BasicAuthenticationProvider.class.getName()); conf.setAuthenticationProviders(providers); @@ -79,9 +78,9 @@ public class ProxyForwardAuthDataTest extends ProducerConsumerBase { @Override protected void cleanup() throws Exception { -super.internalCleanup(); +super.internalCleanup(); } - + @Test void testForwardAuthData() throws Exception { log.info("-- Starting {} test --", methodName); @@ -95,15 +94,13 @@ public class ProxyForwardAuthDataTest extends ProducerConsumerBase { String subscriptionName = "my-subscriber-name"; String clientAuthParams = "authParam:client"; String proxyAuthParams = "authParam:proxy"; - + admin.properties().createProperty("my-property", new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use"))); admin.namespaces().createNamespace(namespaceName); - admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy", Sets.newHashSet(AuthAction.consume, AuthAction.produce)); admin.namespaces().grantPermissionOnNamespace(namespaceName, "client", Sets.newHashSet(AuthAction.consume, AuthAction.produce)); - // Step 2: Run Pulsar Proxy without forwarding authData - expect Exception ProxyConfiguration proxyConfig = new ProxyConfiguration(); proxyConfig.setAuthenticationEnabled(true); @@ -111,35 +108,29 @@ public class ProxyForwardAuthDataTest extends ProducerConsumerBase { proxyConfig.setServicePort(servicePort); proxyConfig.setWebServicePort(webServicePort); proxyConfig.setBrokerServiceURL("pulsar://localhost:" + BROKER_PORT); - proxyConfig.setBrokerClientAuthenticationPlugi
[GitHub] mgodave opened a new pull request #1232: Schema registry (1/4)
mgodave opened a new pull request #1232: Schema registry (1/4) URL: https://github.com/apache/incubator-pulsar/pull/1232 See #1137 for reference This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group
sijie commented on issue #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group URL: https://github.com/apache/incubator-pulsar/pull/1156#issuecomment-365325159 @merlimat I have addressed your comments, please take another round of review. If this looks good for you, I will rebase it to latest master. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167915637 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map , when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; + +// Map , store partition number for each topic +private final ConcurrentHashMap topics; + +// Queue of partition consumers on which we have stopped calling receiveAsync() because the +// shared incoming queue was full +private final ConcurrentLinkedQueue pausedConsumers; + +// Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to +// resume receiving from the paused consumer partitions +private final int sharedQueueResumeThreshold; + +// sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. +AtomicInteger numberTopicPartitions; + +private final ReadWriteLock lock = new ReentrantReadWriteLock(); +private final ConsumerStats stats; +private final UnAckedMessageTracker unAckedMessageTracker; +private final ConsumerConfiguration internalConfig; + +TopicsConsumerImpl(PulsarClientImpl client, Collection topics, String subscription, + ConsumerConfiguration conf, ExecutorService listenerExecutor, + CompletableFuture subscribeFuture) { +super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription, +conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, +subscribeFuture); + +checkArgument(conf.getReceiverQueueSize() > 0, +"Receiver queue size needs to be greater than 0 for Topics Consumer"); + +this.topics = new ConcurrentHashMap<>(); +this.consumers = new ConcurrentHashMap<>(); +this.pausedConsumers = new ConcurrentLinkedQueue<>(); +this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; +this.numberTopicPartitions = new AtomicInteger(0); + +if (conf.getAckTimeoutMillis() != 0) { +this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis());
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167915413 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map , when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; + +// Map , store partition number for each topic +private final ConcurrentHashMap topics; + +// Queue of partition consumers on which we have stopped calling receiveAsync() because the +// shared incoming queue was full +private final ConcurrentLinkedQueue pausedConsumers; + +// Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to +// resume receiving from the paused consumer partitions +private final int sharedQueueResumeThreshold; + +// sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. +AtomicInteger numberTopicPartitions; + +private final ReadWriteLock lock = new ReentrantReadWriteLock(); +private final ConsumerStats stats; +private final UnAckedMessageTracker unAckedMessageTracker; +private final ConsumerConfiguration internalConfig; + +TopicsConsumerImpl(PulsarClientImpl client, Collection topics, String subscription, + ConsumerConfiguration conf, ExecutorService listenerExecutor, + CompletableFuture subscribeFuture) { +super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription, +conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, +subscribeFuture); + +checkArgument(conf.getReceiverQueueSize() > 0, +"Receiver queue size needs to be greater than 0 for Topics Consumer"); + +this.topics = new ConcurrentHashMap<>(); +this.consumers = new ConcurrentHashMap<>(); +this.pausedConsumers = new ConcurrentLinkedQueue<>(); +this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; +this.numberTopicPartitions = new AtomicInteger(0); + +if (conf.getAckTimeoutMillis() != 0) { +this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis());
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167915437 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map , when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; + +// Map , store partition number for each topic +private final ConcurrentHashMap topics; + +// Queue of partition consumers on which we have stopped calling receiveAsync() because the +// shared incoming queue was full +private final ConcurrentLinkedQueue pausedConsumers; + +// Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to +// resume receiving from the paused consumer partitions +private final int sharedQueueResumeThreshold; + +// sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. +AtomicInteger numberTopicPartitions; + +private final ReadWriteLock lock = new ReentrantReadWriteLock(); +private final ConsumerStats stats; +private final UnAckedMessageTracker unAckedMessageTracker; +private final ConsumerConfiguration internalConfig; + +TopicsConsumerImpl(PulsarClientImpl client, Collection topics, String subscription, + ConsumerConfiguration conf, ExecutorService listenerExecutor, + CompletableFuture subscribeFuture) { +super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription, +conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, +subscribeFuture); + +checkArgument(conf.getReceiverQueueSize() > 0, +"Receiver queue size needs to be greater than 0 for Topics Consumer"); + +this.topics = new ConcurrentHashMap<>(); +this.consumers = new ConcurrentHashMap<>(); +this.pausedConsumers = new ConcurrentLinkedQueue<>(); +this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; +this.numberTopicPartitions = new AtomicInteger(0); + +if (conf.getAckTimeoutMillis() != 0) { +this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis());
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167914612 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java ## @@ -124,4 +124,14 @@ * @return the key of the message */ String getKey(); + +/** + * Get the topic name of this message. + * This is mainly for TopicsConsumerImpl to identify a message belongs to which topic. Review comment: Thanks. will change it This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] mgodave commented on issue #1230: ProxyForwardAuthDataTest shouldn't reuse pulsar client
mgodave commented on issue #1230: ProxyForwardAuthDataTest shouldn't reuse pulsar client URL: https://github.com/apache/incubator-pulsar/pull/1230#issuecomment-365314914 Looks good to me, someone with permissions will have to review it though. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167914515 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java ## @@ -124,4 +124,14 @@ * @return the key of the message */ String getKey(); + +/** + * Get the topic name of this message. + * This is mainly for TopicsConsumerImpl to identify a message belongs to which topic. + * + * @return the topic name + */ +default String getTopicName() { Review comment: thanks. will change it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group
sijie commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r167913386 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/ActiveConsumerListener.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +/** + * Listener on the consumer state changes. + */ +public interface ActiveConsumerListener { Review comment: I think lambda is good for non-branchable operations. However it doesn't fit well for branchable operations. we can make a single method here using `Status`, however the application logic has to write `if-else` block to take different actions. It is much clear to me if we have two different methods for two different actions. so I chose to have separated methods for handling become active and become inactive, when applications are wring the listener, you just need to think what should do when it become active and what should do when it become inactive. However if you think using `Status` and having a lambda interface is much better, let me know. I will make the change. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167911587 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map , when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; + +// Map , store partition number for each topic +private final ConcurrentHashMap topics; + +// Queue of partition consumers on which we have stopped calling receiveAsync() because the +// shared incoming queue was full +private final ConcurrentLinkedQueue pausedConsumers; + +// Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to +// resume receiving from the paused consumer partitions +private final int sharedQueueResumeThreshold; + +// sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. +AtomicInteger numberTopicPartitions; + +private final ReadWriteLock lock = new ReentrantReadWriteLock(); +private final ConsumerStats stats; +private final UnAckedMessageTracker unAckedMessageTracker; +private final ConsumerConfiguration internalConfig; + +TopicsConsumerImpl(PulsarClientImpl client, Collection topics, String subscription, + ConsumerConfiguration conf, ExecutorService listenerExecutor, + CompletableFuture subscribeFuture) { +super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription, +conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, +subscribeFuture); + +checkArgument(conf.getReceiverQueueSize() > 0, +"Receiver queue size needs to be greater than 0 for Topics Consumer"); + +this.topics = new ConcurrentHashMap<>(); +this.consumers = new ConcurrentHashMap<>(); +this.pausedConsumers = new ConcurrentLinkedQueue<>(); +this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; +this.numberTopicPartitions = new AtomicInteger(0); + +if (conf.getAckTimeoutMillis() != 0) { +this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis());
[GitHub] sijie commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group
sijie commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r167911055 ## File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java ## @@ -81,20 +82,33 @@ public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int part protected abstract void cancelPendingRead(); -protected void pickAndScheduleActiveConsumer() { +protected void notifyConsumerGroupChanged(Consumer activeConsumer) { +if (null != activeConsumer && subscriptionType == SubType.Failover) { +consumers.forEach(consumer -> Review comment: I think this is the first iteration for this listener. I am trying to be avoiding any optimization at the beginning and deliver notifications for every changes. This way we can make sure the first version is "eventually" correct without hitting any race condition on handling this kind of smart logic. If sending multiple notifications become a problem we can do a subsequent change later. does that make sense to you? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group
sijie commented on a change in pull request #1156: Introduce ActiveConsumerListener for realizing if a consumer is active in a failover subscription group URL: https://github.com/apache/incubator-pulsar/pull/1156#discussion_r167909684 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java ## @@ -241,6 +242,13 @@ public Consumer subscribe(String topic, String subscription, ConsumerConfigurati new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined")); } +if (conf.getActiveConsumerListener() != null Review comment: I think the main motivation for this change here is for supporting failover subscription. I would disable this feature for other subscriptions for now, and introduce it if we really need it. does that make sense to you? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167907583 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map , when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; + +// Map , store partition number for each topic +private final ConcurrentHashMap topics; + +// Queue of partition consumers on which we have stopped calling receiveAsync() because the +// shared incoming queue was full +private final ConcurrentLinkedQueue pausedConsumers; + +// Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to +// resume receiving from the paused consumer partitions +private final int sharedQueueResumeThreshold; + +// sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. +AtomicInteger numberTopicPartitions; + +private final ReadWriteLock lock = new ReentrantReadWriteLock(); +private final ConsumerStats stats; +private final UnAckedMessageTracker unAckedMessageTracker; +private final ConsumerConfiguration internalConfig; + +TopicsConsumerImpl(PulsarClientImpl client, Collection topics, String subscription, + ConsumerConfiguration conf, ExecutorService listenerExecutor, + CompletableFuture subscribeFuture) { +super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription, +conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, +subscribeFuture); + +checkArgument(conf.getReceiverQueueSize() > 0, +"Receiver queue size needs to be greater than 0 for Topics Consumer"); + +this.topics = new ConcurrentHashMap<>(); +this.consumers = new ConcurrentHashMap<>(); +this.pausedConsumers = new ConcurrentLinkedQueue<>(); +this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; +this.numberTopicPartitions = new AtomicInteger(0); + +if (conf.getAckTimeoutMillis() != 0) { +this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis());
[GitHub] sijie commented on issue #1223: Add a `backend` admin restful endpoint for query backend information
sijie commented on issue #1223: Add a `backend` admin restful endpoint for query backend information URL: https://github.com/apache/incubator-pulsar/pull/1223#issuecomment-365307868 @yush1ga @rdhabalia the combination of zkservers and ledgersrootpath is actually the "bookkeeper service url", zkservers indicates where pulsar stores its metadata and globalZooKeeperServers indciates where pulsar stores policies data. so all these 3 settings together are actually describe the "backend" for pulsar brokers. the purpose of this PR is to provide a method for application to know which zk, bk that a pulsar cluster is using. I am fine with changing the name to a proper name that you think it is better. so which name I should be using? InternalConfiguration/InternalData ? Or? If we can get a consensus about the naming, I can make the modification. /cc @merlimat This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167906488 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java ## @@ -18,26 +18,24 @@ */ package org.apache.pulsar.client.impl; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; import java.io.Closeable; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; - +import java.util.function.Predicate; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; - public class UnAckedMessageTracker implements Closeable { Review comment: Thanks, will make a class UnAckedTopicMessageTracker extends UnAckedMessageTracker This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on issue #1059: Issue 1014: Rename "global zookeeper" to "configuration-store"(change in code, conf and cli)
zhaijack commented on issue #1059: Issue 1014: Rename "global zookeeper" to "configuration-store"(change in code, conf and cli) URL: https://github.com/apache/incubator-pulsar/pull/1059#issuecomment-365298276 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on issue #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
zhaijack commented on issue #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#issuecomment-365287664 Thanks, have resolved all the comments, and fixed rebase conflict. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sijie commented on issue #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
sijie commented on issue #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#issuecomment-365286252 @zhaijack : can you address @merlimat and @ivankelly 's comments? we need this change for function worker. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167879404 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java ## @@ -468,17 +468,20 @@ public void redeliverUnacknowledgedMessages() { } @Override -public void redeliverUnacknowledgedMessages(Set messageIds) { +public void redeliverUnacknowledgedMessages(Set messageIds) { Review comment: Oh, it is in ConsumerBase.java, sorry for the wrong reference of Consumer.java. TopicsMessageIdImpl is more need to be a wrapper for MessageIdImpl, and as the reply below, may be better to implements MessageId, instead of extends MessageIdImpl. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167252486 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java ## @@ -37,6 +36,16 @@ */ byte[] toByteArray(); +/** + * Get the topic name of this MessageId. + * This is mainly for TopicsConsumerImpl to identify a message belongs to which topic. + * + * @return the topic name + */ +default String getTopicName() { Review comment: Yes. It is as @merlimat metioned. for MessageId, we also need `topicName` for `redeliverUnacknowledgedMessages`. will remove it from MessageId This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167877509 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map , when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; Review comment: Thanks. Yes. There is a plan to make this extends PartitionedConsumerImpl, that is also the reason that this change keep use a lot of same interface. But it maybe good to make them separate at first, this could avoid bring bugs into PartitionedConsumerImpl. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167864929 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java ## @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.client.impl; + +import java.util.Map; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; + +public class TopicMessageImpl implements Message { Review comment: Thanks. Here it is also mainly a wrapper for original MessageImpl, the original MessageImpl will be used for one internal-sub-consumer. a extend may be not easy to handle constructor and get. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167864929 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java ## @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.client.impl; + +import java.util.Map; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; + +public class TopicMessageImpl implements Message { Review comment: Thanks. Here it is also mainly a wrapper for original MessageImpl, the original MessageImpl will be used for one internal-sub-consumer. a extend may be not easy to handle it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167863222 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java ## @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import org.apache.pulsar.client.api.MessageId; + +public class TopicMessageIdImpl implements MessageId { Review comment: Thanks. Here TopicMessageIdImpl is mainly a wrapper for MessageIdImpl, We need keep a reference of MessageIdImpl, because it will be used for one internal-sub-consumer. If extending, the constructor and getInnerMessageIdInner may be not easy to handle. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167863222 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java ## @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import org.apache.pulsar.client.api.MessageId; + +public class TopicMessageIdImpl implements MessageId { Review comment: Thanks. Here TopicMessageIdImpl is mainly a wrapper for MessageIdImpl, We need keep a reference of MessageIdImpl, because it will be used for each internal-sub-consumer. If extending, the constructor and getInnerMessageIdInner may be not easy to handle. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] maskit commented on issue #1225: Enable specification of TLS Protocol Versions and Cipher Suites
maskit commented on issue #1225: Enable specification of TLS Protocol Versions and Cipher Suites URL: https://github.com/apache/incubator-pulsar/pull/1225#issuecomment-365254767 @jai1 You may want to read these. If I understand correctly, HTTP entrypoints, which use `SecurityUtility::createSslContext`, may accept any protocol versions, even if we change the string to "TLS1.2". https://docs.oracle.com/javase/9/docs/api/javax/net/ssl/SSLContext.html#getInstance-java.lang.String- https://docs.oracle.com/javase/9/docs/specs/security/standard-names.html#sslcontext-algorithms https://stackoverflow.com/questions/43481010/how-to-enable-only-tlsv1-2-on-java-8-server-application?noredirect=1&lq=1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly opened a new pull request #1231: Read from compacted topic ledger if available and enabled
ivankelly opened a new pull request #1231: Read from compacted topic ledger if available and enabled URL: https://github.com/apache/incubator-pulsar/pull/1231 If a topic has been compacted and the client has enabled reads from compacted topics, try to read from the compacted ledger if the cursor position lands before or within the range of message IDs in the compacted topic ledger. If the cursor position lands after the message IDs, in the compacted topic ledger, read from the cursor as normal. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on issue #1231: Read from compacted topic ledger if available and enabled
ivankelly commented on issue #1231: Read from compacted topic ledger if available and enabled URL: https://github.com/apache/incubator-pulsar/pull/1231#issuecomment-365215391 @merlimat This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
zhaijack commented on a change in pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace URL: https://github.com/apache/incubator-pulsar/pull/1103#discussion_r167813669 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java ## @@ -0,0 +1,881 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.util.ConsumerName; +import org.apache.pulsar.client.util.FutureUtil; +import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.naming.NamespaceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TopicsConsumerImpl extends ConsumerBase { + +// All topics should be in same namespace +protected NamespaceName namespaceName; + +// Map , when get do ACK, consumer will by find by topic name +private final ConcurrentHashMap consumers; + +// Map , store partition number for each topic +private final ConcurrentHashMap topics; + +// Queue of partition consumers on which we have stopped calling receiveAsync() because the +// shared incoming queue was full +private final ConcurrentLinkedQueue pausedConsumers; + +// Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to +// resume receiving from the paused consumer partitions +private final int sharedQueueResumeThreshold; + +// sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number. +AtomicInteger numberTopicPartitions; + +private final ReadWriteLock lock = new ReentrantReadWriteLock(); +private final ConsumerStats stats; +private final UnAckedMessageTracker unAckedMessageTracker; +private final ConsumerConfiguration internalConfig; + +TopicsConsumerImpl(PulsarClientImpl client, Collection topics, String subscription, + ConsumerConfiguration conf, ExecutorService listenerExecutor, + CompletableFuture subscribeFuture) { +super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription, +conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, +subscribeFuture); + +checkArgument(conf.getReceiverQueueSize() > 0, +"Receiver queue size needs to be greater than 0 for Topics Consumer"); + +this.topics = new ConcurrentHashMap<>(); +this.consumers = new ConcurrentHashMap<>(); +this.pausedConsumers = new ConcurrentLinkedQueue<>(); +this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; +this.numberTopicPartitions = new AtomicInteger(0); + +if (conf.getAckTimeoutMillis() != 0) { +this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis());
svn commit: r25025 - /dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/
Author: jai1 Date: Tue Feb 13 09:59:07 2018 New Revision: 25025 Log: Staging artifacts and signature for Pulsar release 1.22.0-incubating Added: dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/ dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz (with props) dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz.asc dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz.md5 dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz.sha512 Added: dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz == Binary file - no diff available. Propchange: dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz -- svn:mime-type = application/octet-stream Added: dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz.asc == --- dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz.asc (added) +++ dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz.asc Tue Feb 13 09:59:07 2018 @@ -0,0 +1,16 @@ +-BEGIN PGP SIGNATURE- + +iQIzBAABCAAdFiEEDQCP4t9TLRC/fG0se6GmTLvBFOwFAlqCtq8ACgkQe6GmTLvB +FOyEwhAAt0OFdejzQTJi7KA2Qv+mhazVuBEZfygwyRjNy7pNoX+CI0tIDCo6myDd +jfUjx3Fg6hFR3WGzLW0+n+meHxsWrcpiqoR6vV7PjH+CpxJXY6+ZrS3/0QuDdAT/ +/ZlSStC8S3AT6mSRJzjab9HGWIwsE+QZmuiLzjzkUTBDNZ4QAstbN57MY4DrDDe3 +HbSKoCDrPWp+eWmuimD2U1fhxMpPKaUonXLvH1jAhsLYNUt+yS7o0UOlFm+gbktK +h2t4T+t2QeHzefkh8+Tud1EV0wObTbel54qda3LKGwgGUwPCMs7UxZ/oeSJTd25E +oKAizymEcdcnpPHBYfq7pr/QhPW2bpO1OVjVKBk8mN3SjVO1366MawxrAfr1AKby +wQcy8kVN2YskM0VSYl+Ck5BRJl//NO59oGFIPtp7IXy0F/yqokwkVJC4I7YPM3ua +r5Ozixl8+NFDPE+8SVrmUASiHJLbjG7RBT3SyWuzEMV642uQY6FHp6NMlncBczkK +slkXz/U5i+E/GM+5Q+MZ7HmwzrCmicYgqiUI8pryYLLPxRjtpiZ9f074jiRtXxFO +5HRZFackUg0Zz2e0pBhv8FCfLvMc4NiCt8LvqmcPNnXEjMc6m4/Vz+6UheLRBRK+ +wnVxieyKxZYDxFe47xorGMwuGa7M9S4BIWxBuVQ6/lVi2TIQXQw= +=KFvU +-END PGP SIGNATURE- Added: dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz.md5 == --- dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz.md5 (added) +++ dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz.md5 Tue Feb 13 09:59:07 2018 @@ -0,0 +1,2 @@ +apache-pulsar-1.22.0-incubating-src.tar.gz: +29 F0 89 45 04 08 AD 06 6F 75 0C B7 91 6F F9 9F Added: dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz.sha512 == --- dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz.sha512 (added) +++ dev/incubator/pulsar/pulsar-1.22.0-incubating-candidate-0/apache-pulsar-1.22.0-incubating-src.tar.gz.sha512 Tue Feb 13 09:59:07 2018 @@ -0,0 +1,3 @@ +apache-pulsar-1.22.0-incubating-src.tar.gz: +52ED243F 8DE5BAFA BC430118 9629D216 577AC2AF 4B186317 A8BC513C 4D87FE38 82C7088F + ED1E9B1E D7CAD96D A5287730 15A55080 32BA182A D9A60619 C6813D48
[GitHub] zhaijack commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic
zhaijack commented on a change in pull request #1066: Issue 937: add CommandGetLastMessageId to make reader know the end of topic URL: https://github.com/apache/incubator-pulsar/pull/1066#discussion_r167811273 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java ## @@ -1248,6 +1254,77 @@ public void seek(MessageId messageId) throws PulsarClientException { return seekFuture; } +public boolean hasMessageAvailable() throws PulsarClientException { +try { +return hasMessageAvailableAsync().get(); +} catch (ExecutionException | InterruptedException e) { +throw new PulsarClientException(e); +} +} + +public CompletableFuture hasMessageAvailableAsync() { +final CompletableFuture booleanFuture = new CompletableFuture<>(); + +if (lastMessageIdInBroker.compareTo(lastDequeuedMessage) > 0 && Review comment: Thanks. will add it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on issue #1230: ProxyForwardAuthDataTest shouldn't reuse pulsar client
ivankelly commented on issue #1230: ProxyForwardAuthDataTest shouldn't reuse pulsar client URL: https://github.com/apache/incubator-pulsar/pull/1230#issuecomment-365205415 @jai1 @merlimat @mgodave This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly commented on issue #1230: ProxyForwardAuthDataTest shouldn't reuse pulsar client
ivankelly commented on issue #1230: ProxyForwardAuthDataTest shouldn't reuse pulsar client URL: https://github.com/apache/incubator-pulsar/pull/1230#issuecomment-365204264 This patch addresses #1211 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ivankelly opened a new pull request #1230: ProxyForwardAuthDataTest shouldn't reuse pulsar client
ivankelly opened a new pull request #1230: ProxyForwardAuthDataTest shouldn't reuse pulsar client URL: https://github.com/apache/incubator-pulsar/pull/1230 This test was flaking due to the pulsar client being reused. The test starts a proxy service, subscribes to a topic, stops the proxy, starts a new proxy and tries to subscribe again. If using the same client for both connections, the client will have an open connection to the first proxy, which, when the proxy is stopped, will be closed by netty asynchronously. This can race with the second subscription attempt, and cause it to fail with ConnectionClosedException. Specifically, if the subscription attempt runs before the netty callback runs, the subscription attempt will try to subscribe on a dead connection. The immediate fix for this test, to stop the flaking, is just to use a different client for each attempt. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services