This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 754b864cf5f8844881eb9d47f4eaba6b4fb6d5c2 Author: lipenghui <peng...@apache.org> AuthorDate: Fri Jul 17 09:22:40 2020 +0800 Handle NotAllowed Exception at the client side. (#7430) * Handle NotAllowed Exception at the client side. (cherry picked from commit f8b2a2334fb7d2dc5266242a6393c9cc434fba60) --- .../broker/service/BrokerServiceException.java | 2 ++ .../client/api/KeySharedSubscriptionTest.java | 2 +- .../pulsar/client/api/PulsarClientException.java | 22 ++++++++++++++++++++++ pulsar-client-cpp/include/pulsar/Result.h | 1 + pulsar-client-cpp/lib/ClientConnection.cc | 3 +++ pulsar-client-cpp/lib/Result.cc | 3 +++ .../org/apache/pulsar/client/impl/ClientCnx.java | 2 ++ .../apache/pulsar/common/api/proto/PulsarApi.java | 3 +++ pulsar-common/src/main/proto/PulsarApi.proto | 1 + 9 files changed, 38 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java index 6d0e50f..7ec97ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java @@ -212,6 +212,8 @@ public class BrokerServiceException extends Exception { return ServerError.TransactionCoordinatorNotFound; } else if (t instanceof CoordinatorException.InvalidTxnStatusException) { return ServerError.InvalidTxnStatus; + } else if (t instanceof NotAllowedException) { + return ServerError.NotAllowedError; } else { if (checkCauseIfUnknown) { return getClientErrorCode(t.getCause(), false); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index 2a7a20b..610c4d1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -397,7 +397,7 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase { receiveAndCheck(checkList); } - @Test(expectedExceptions = PulsarClientException.class) + @Test(expectedExceptions = PulsarClientException.NotAllowedException.class) public void testDisableKeySharedSubscription() throws PulsarClientException { this.conf.setSubscriptionKeySharedEnable(false); String topic = "persistent://public/default/key_shared_disabled"; diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java index 6a6e42a..597e0d5 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java @@ -640,6 +640,23 @@ public class PulsarClientException extends IOException { } /** + * Not allowed exception thrown by Pulsar client. + */ + public static class NotAllowedException extends PulsarClientException { + + /** + * Constructs an {@code NotAllowedException} with the specified detail message. + * + * @param msg + * The detail message (which is saved for later retrieval + * by the {@link #getMessage()} method) + */ + public NotAllowedException(String msg) { + super(msg); + } + } + + /** * Full producer queue error thrown by Pulsar client. */ public static class ProducerQueueIsFullError extends PulsarClientException { @@ -790,6 +807,8 @@ public class PulsarClientException extends IOException { return new InvalidTopicNameException(msg); } else if (t instanceof NotSupportedException) { return new NotSupportedException(msg); + } else if (t instanceof NotAllowedException) { + return new NotAllowedException(msg); } else if (t instanceof ProducerQueueIsFullError) { return new ProducerQueueIsFullError(msg); } else if (t instanceof ProducerBlockedQuotaExceededError) { @@ -873,6 +892,8 @@ public class PulsarClientException extends IOException { return new InvalidTopicNameException(msg); } else if (cause instanceof NotSupportedException) { return new NotSupportedException(msg); + } else if (cause instanceof NotAllowedException) { + return new NotAllowedException(msg); } else if (cause instanceof ProducerQueueIsFullError) { return new ProducerQueueIsFullError(msg); } else if (cause instanceof ProducerBlockedQuotaExceededError) { @@ -911,6 +932,7 @@ public class PulsarClientException extends IOException { || t instanceof InvalidMessageException || t instanceof InvalidTopicNameException || t instanceof NotSupportedException + || t instanceof NotAllowedException || t instanceof ChecksumException || t instanceof CryptoException || t instanceof ConsumerAssignException diff --git a/pulsar-client-cpp/include/pulsar/Result.h b/pulsar-client-cpp/include/pulsar/Result.h index b2a16c5..6dd7e45 100644 --- a/pulsar-client-cpp/include/pulsar/Result.h +++ b/pulsar-client-cpp/include/pulsar/Result.h @@ -82,6 +82,7 @@ enum Result /// Shared and Key_Shared subscription mode ResultTransactionCoordinatorNotFoundError, /// Transaction coordinator not found ResultInvalidTxnStatusError, /// Invalid txn status error + ResultNotAllowedError, /// Not allowed }; // Return string representation of result code diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index fe43805..3628dd5 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -117,6 +117,9 @@ static Result getResult(ServerError serverError) { case InvalidTxnStatus: return ResultInvalidTxnStatusError; + + case NotAllowedError: + return ResultNotAllowedError; } // NOTE : Do not add default case in the switch above. In future if we get new cases for // ServerError and miss them in the switch above we would like to get notified. Adding diff --git a/pulsar-client-cpp/lib/Result.cc b/pulsar-client-cpp/lib/Result.cc index fc4d81f..3c1c2a8 100644 --- a/pulsar-client-cpp/lib/Result.cc +++ b/pulsar-client-cpp/lib/Result.cc @@ -144,6 +144,9 @@ const char* strResult(Result result) { case ResultInvalidTxnStatusError: return "ResultInvalidTxnStatusError"; + + case ResultNotAllowedError: + return "ResultNotAllowedError"; }; // NOTE : Do not add default case in the switch above. In future if we get new cases for // ServerError and miss them in the switch above we would like to get notified. Adding diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index d63cbc1..ea1624b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -1003,6 +1003,8 @@ public class ClientCnx extends PulsarHandler { return new PulsarClientException.TopicDoesNotExistException(errorMsg); case ConsumerAssignError: return new PulsarClientException.ConsumerAssignException(errorMsg); + case NotAllowedError: + return new PulsarClientException.NotAllowedException(errorMsg); case UnknownError: default: return new PulsarClientException(errorMsg); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index 02fed41..4d1babf 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -82,6 +82,7 @@ public final class PulsarApi { ConsumerAssignError(19, 19), TransactionCoordinatorNotFound(20, 20), InvalidTxnStatus(21, 21), + NotAllowedError(22, 22), ; public static final int UnknownError_VALUE = 0; @@ -106,6 +107,7 @@ public final class PulsarApi { public static final int ConsumerAssignError_VALUE = 19; public static final int TransactionCoordinatorNotFound_VALUE = 20; public static final int InvalidTxnStatus_VALUE = 21; + public static final int NotAllowedError_VALUE = 22; public final int getNumber() { return value; } @@ -134,6 +136,7 @@ public final class PulsarApi { case 19: return ConsumerAssignError; case 20: return TransactionCoordinatorNotFound; case 21: return InvalidTxnStatus; + case 22: return NotAllowedError; default: return null; } } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index be5e4d4..c4acca2 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -196,6 +196,7 @@ enum ServerError { TransactionCoordinatorNotFound = 20; // Transaction coordinator not found error InvalidTxnStatus = 21; // Invalid txn status error + NotAllowedError = 22; // Not allowed error } enum AuthMethod {