[GitHub] merlimat commented on a change in pull request #2543: Add ServiceUrlProvider and add method forceCloseConnection in PulsarC…
merlimat commented on a change in pull request #2543: Add ServiceUrlProvider and add method forceCloseConnection in PulsarC… URL: https://github.com/apache/incubator-pulsar/pull/2543#discussion_r216139475 ## File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java ## @@ -706,6 +706,58 @@ public void shutdown() throws PulsarClientException { } } +@Override +public void forceCloseConnection() { +if (this.producers != null) { Review comment: There are few problems with this way of closing the connections. `channelInactive()` is supposed to be called by Netty when the connection gets closed. With the current approache the original TCP connection will stay open and the broker will still have registered the producer/consumer as connected. The right approach here should be to close all the connections in the `ConnectionPool`. That will be triggering the reconnections for all producers/consumers. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2529: Added cmake install for lib and headers
merlimat commented on issue #2529: Added cmake install for lib and headers URL: https://github.com/apache/incubator-pulsar/pull/2529#issuecomment-419668830 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat commented on issue #2511: [documentation] Add section about licensing at contribution guide
merlimat commented on issue #2511: [documentation] Add section about licensing at contribution guide URL: https://github.com/apache/incubator-pulsar/pull/2511#issuecomment-419668698 run java8 tests This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat closed pull request #2537: [schema] enable Schema.AUTO if functions or connectors are using GenericRecord
merlimat closed pull request #2537: [schema] enable Schema.AUTO if functions or connectors are using GenericRecord URL: https://github.com/apache/incubator-pulsar/pull/2537 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/api-java/pom.xml b/pulsar-functions/api-java/pom.xml index 9a8420608a..0f21bae460 100644 --- a/pulsar-functions/api-java/pom.xml +++ b/pulsar-functions/api-java/pom.xml @@ -37,6 +37,12 @@ slf4j-api + + ${project.groupId} + pulsar-client-schema + ${project.version} + + net.jodah typetools diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index e1059f390e..6eed8e0142 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -78,7 +78,7 @@ public void open(Map config, SourceContext sourceContext) throws inputConsumers = configs.entrySet().stream().map(e -> { String topic = e.getKey(); ConsumerConfig conf = e.getValue(); -log.info("Creating consumers for topic : {}", topic); +log.info("Creating consumers for topic : {}, schema : {}", topic, conf.getSchema()); ConsumerBuilder cb = pulsarClient.newConsumer(conf.getSchema()) // consume message even if can't decrypt and deliver it along with encryption-ctx .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java index 2ac5b65764..76375dcc67 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java @@ -26,6 +26,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.JSONSchema; @@ -68,24 +69,31 @@ public TopicSchema(PulsarClient client) { } public Schema getSchema(String topic, Class clazz, SchemaType schemaType) { -return cachedSchemas.computeIfAbsent(topic, t -> extractSchema(clazz, schemaType)); +return cachedSchemas.computeIfAbsent(topic, t -> newSchemaInstance(clazz, schemaType)); } /** * If the topic is already created, we should be able to fetch the schema type (avro, json, ...) */ private SchemaType getSchemaTypeOrDefault(String topic, Class clazz) { -Optional schema = ((PulsarClientImpl) client).getSchema(topic).join(); -if (schema.isPresent()) { -return schema.get().getType(); +if (GenericRecord.class.isAssignableFrom(clazz)) { +return SchemaType.AUTO; } else { -return getDefaultSchemaType(clazz); +Optional schema = ((PulsarClientImpl) client).getSchema(topic).join(); +if (schema.isPresent()) { +return schema.get().getType(); +} else { +return getDefaultSchemaType(clazz); +} } } private static SchemaType getDefaultSchemaType(Class clazz) { if (byte[].class.equals(clazz)) { return SchemaType.NONE; +} else if (GenericRecord.class.isAssignableFrom(clazz)) { +// the function is taking generic record, so we do auto schema detection +return SchemaType.AUTO; } else if (String.class.equals(clazz)) { // If type is String, then we use schema type string, otherwise we fallback on default schema return SchemaType.STRING; @@ -102,6 +110,9 @@ private static SchemaType getDefaultSchemaType(Class clazz) { case NONE: return (Schema) Schema.BYTES; +case AUTO: +return (Schema) Schema.AUTO(); + case STRING: return (Schema) Schema.STRING; @@ -165,27 +176,4 @@ private static boolean isProtobufClass(Class pojoClazz) { return new SerDeSchema<>(serDe); } } - -@SuppressWarnings("unchecked") -private static Schema extractSchema(Class clazz, SchemaType type) { -
[incubator-pulsar] branch master updated: [schema] enable Schema.AUTO if functions or connectors are using GenericRecord (#2537)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 9ec8a5e [schema] enable Schema.AUTO if functions or connectors are using GenericRecord (#2537) 9ec8a5e is described below commit 9ec8a5ee4d4a80c50986cde95093b1eb4af31ef0 Author: Sijie Guo AuthorDate: Sat Sep 8 12:51:12 2018 -0700 [schema] enable Schema.AUTO if functions or connectors are using GenericRecord (#2537) * [schema] enable Schema.AUTO if functions or connectors are using GenericRecord * add schema-api to functions-api --- pulsar-functions/api-java/pom.xml | 6 ++ .../pulsar/functions/source/PulsarSource.java | 2 +- .../pulsar/functions/source/TopicSchema.java | 44 + pulsar-functions/java-examples/pom.xml | 1 + .../functions/api/examples/AutoSchemaFunction.java | 33 ++ tests/integration/pom.xml | 12 .../integration/functions/PulsarFunctionsTest.java | 77 +- 7 files changed, 143 insertions(+), 32 deletions(-) diff --git a/pulsar-functions/api-java/pom.xml b/pulsar-functions/api-java/pom.xml index 9a84206..0f21bae 100644 --- a/pulsar-functions/api-java/pom.xml +++ b/pulsar-functions/api-java/pom.xml @@ -38,6 +38,12 @@ + ${project.groupId} + pulsar-client-schema + ${project.version} + + + net.jodah typetools test diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index e1059f3..6eed8e0 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -78,7 +78,7 @@ public class PulsarSource extends PushSource implements MessageListener inputConsumers = configs.entrySet().stream().map(e -> { String topic = e.getKey(); ConsumerConfig conf = e.getValue(); -log.info("Creating consumers for topic : {}", topic); +log.info("Creating consumers for topic : {}, schema : {}", topic, conf.getSchema()); ConsumerBuilder cb = pulsarClient.newConsumer(conf.getSchema()) // consume message even if can't decrypt and deliver it along with encryption-ctx .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java index 2ac5b65..76375dc 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java @@ -26,6 +26,7 @@ import java.util.Optional; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.client.impl.schema.JSONSchema; @@ -68,24 +69,31 @@ public class TopicSchema { } public Schema getSchema(String topic, Class clazz, SchemaType schemaType) { -return cachedSchemas.computeIfAbsent(topic, t -> extractSchema(clazz, schemaType)); +return cachedSchemas.computeIfAbsent(topic, t -> newSchemaInstance(clazz, schemaType)); } /** * If the topic is already created, we should be able to fetch the schema type (avro, json, ...) */ private SchemaType getSchemaTypeOrDefault(String topic, Class clazz) { -Optional schema = ((PulsarClientImpl) client).getSchema(topic).join(); -if (schema.isPresent()) { -return schema.get().getType(); +if (GenericRecord.class.isAssignableFrom(clazz)) { +return SchemaType.AUTO; } else { -return getDefaultSchemaType(clazz); +Optional schema = ((PulsarClientImpl) client).getSchema(topic).join(); +if (schema.isPresent()) { +return schema.get().getType(); +} else { +return getDefaultSchemaType(clazz); +} } } private static SchemaType getDefaultSchemaType(Class clazz) { if (byte[].class.equals(clazz)) { return SchemaType.NONE; +} else if (GenericRecord.class.isAssignableFrom(clazz)) { +// the function is taking generic record, so we do auto schema detect
[GitHub] merlimat closed pull request #2542: Fix status code of REST API to get partitioned topic stats
merlimat closed pull request #2542: Fix status code of REST API to get partitioned topic stats URL: https://github.com/apache/incubator-pulsar/pull/2542 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index f85c6175a7..da51453f8a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -604,6 +604,12 @@ protected PartitionedTopicStats internalGetPartitionedStats(boolean authoritativ stats.add(partitionStats); stats.partitions.put(topicName.getPartition(i).toString(), partitionStats); } +} catch (PulsarAdminException e) { +if (e.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { +throw new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet"); +} else { +throw new RestException(e); +} } catch (Exception e) { throw new RestException(e); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index ae72737083..c7c090fd44 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -53,6 +53,7 @@ import javax.ws.rs.client.InvocationCallback; import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response.Status; import lombok.extern.slf4j.Slf4j; @@ -787,6 +788,16 @@ public void partitionedTopics(String topicName) throws Exception { assertEquals(admin.topics().getPartitionedTopicMetadata("persistent://prop-xyz/ns1/ds2").partitions, 0); +try { +admin.topics().getPartitionedStats(partitionedTopicName, false); +fail("should have failed"); +} catch (PulsarAdminException e) { +// ok +assertEquals(e.getStatusCode(), Status.NOT_FOUND.getStatusCode()); +} catch (Exception e) { +fail(e.getMessage()); +} + // create consumer and subscription URL pulsarUrl = new URL("http://127.0.0.1"; + ":" + BROKER_WEBSERVICE_PORT); PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).statsInterval(0, TimeUnit.SECONDS) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] merlimat closed pull request #2540: Enable C++ AuthFactory to parse Athenz params string
merlimat closed pull request #2540: Enable C++ AuthFactory to parse Athenz params string URL: https://github.com/apache/incubator-pulsar/pull/2540 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-cpp/include/pulsar/Authentication.h b/pulsar-client-cpp/include/pulsar/Authentication.h index bde413434f..2ea1238e2a 100644 --- a/pulsar-client-cpp/include/pulsar/Authentication.h +++ b/pulsar-client-cpp/include/pulsar/Authentication.h @@ -61,6 +61,7 @@ class Authentication { authDataContent = authData_; return ResultOk; } +static ParamMap parseDefaultFormatAuthParams(const std::string& authParamsString); protected: Authentication(); @@ -104,6 +105,7 @@ class AuthTls : public Authentication { AuthTls(AuthenticationDataPtr&); ~AuthTls(); static AuthenticationPtr create(ParamMap& params); +static AuthenticationPtr create(const std::string& authParamsString); static AuthenticationPtr create(const std::string& certificatePath, const std::string& privateKeyPath); const std::string getAuthMethodName() const; Result getAuthData(AuthenticationDataPtr& authDataTls) const; diff --git a/pulsar-client-cpp/lib/Authentication.cc b/pulsar-client-cpp/lib/Authentication.cc index c1025e161d..b3ebf1c633 100644 --- a/pulsar-client-cpp/lib/Authentication.cc +++ b/pulsar-client-cpp/lib/Authentication.cc @@ -60,6 +60,22 @@ Authentication::Authentication() {} Authentication::~Authentication() {} +ParamMap Authentication::parseDefaultFormatAuthParams(const std::string& authParamsString) { +ParamMap paramMap; +if (!authParamsString.empty()) { +std::vector params; +boost::algorithm::split(params, authParamsString, boost::is_any_of(",")); +for (int i = 0; i < params.size(); i++) { +std::vector kv; +boost::algorithm::split(kv, params[i], boost::is_any_of(":")); +if (kv.size() == 2) { +paramMap[kv[0]] = kv[1]; +} +} +} +return paramMap; +} + class AuthDisabledData : public AuthenticationDataProvider { public: AuthDisabledData(ParamMap& params) {} @@ -111,6 +127,17 @@ AuthenticationPtr tryCreateBuiltinAuth(const std::string& pluginName, ParamMap& } } +AuthenticationPtr tryCreateBuiltinAuth(const std::string& pluginName, const std::string& authParamsString) { +if (boost::iequals(pluginName, TLS_PLUGIN_NAME) || boost::iequals(pluginName, TLS_JAVA_PLUGIN_NAME)) { +return AuthTls::create(authParamsString); +} else if (boost::iequals(pluginName, ATHENZ_PLUGIN_NAME) || + boost::iequals(pluginName, ATHENZ_JAVA_PLUGIN_NAME)) { +return AuthAthenz::create(authParamsString); +} else { +return AuthenticationPtr(); +} +} + AuthenticationPtr AuthFactory::create(const std::string& pluginNameOrDynamicLibPath, const std::string& authParamsString) { { @@ -121,20 +148,7 @@ AuthenticationPtr AuthFactory::create(const std::string& pluginNameOrDynamicLibP } } -ParamMap paramMap; -if (!authParamsString.empty()) { -std::vector params; -boost::algorithm::split(params, authParamsString, boost::is_any_of(",")); -for (int i = 0; i < params.size(); i++) { -std::vector kv; -boost::algorithm::split(kv, params[i], boost::is_any_of(":")); -if (kv.size() == 2) { -paramMap[kv[0]] = kv[1]; -} -} -} - -AuthenticationPtr authPtr = tryCreateBuiltinAuth(pluginNameOrDynamicLibPath, paramMap); +AuthenticationPtr authPtr = tryCreateBuiltinAuth(pluginNameOrDynamicLibPath, authParamsString); if (authPtr) { return authPtr; } @@ -151,6 +165,7 @@ AuthenticationPtr AuthFactory::create(const std::string& pluginNameOrDynamicLibP if (createAuthentication != NULL) { auth = createAuthentication(authParamsString); } else { +ParamMap paramMap = Authentication::parseDefaultFormatAuthParams(authParamsString); return AuthFactory::create(pluginNameOrDynamicLibPath, paramMap); } } diff --git a/pulsar-client-cpp/lib/auth/AuthTls.cc b/pulsar-client-cpp/lib/auth/AuthTls.cc index f076aafb3b..fcf6571a25 100644 --- a/pulsar-client-cpp/lib/auth/AuthTls.cc +++ b/pulsar-client-cpp/lib/auth/AuthTls.cc @@ -36,6 +36,11 @@ AuthTls::AuthTls(AuthenticationDataPtr& authDataTls) { authDataTls_ = authDataTl AuthTls::~AuthTls() {} +AuthenticationPtr AuthTls::create(const std::string& authParamsString) { +ParamMap params = parseDefaultFormatAuthParams(authParamsString); +return create(params); +} + AuthenticationPtr AuthTls:
[incubator-pulsar] branch master updated: Enable C++ AuthFactory to parse Athenz params string (#2540)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new 6451efd Enable C++ AuthFactory to parse Athenz params string (#2540) 6451efd is described below commit 6451efd6515026288ae8aa1b7118e97904eecf76 Author: massakam AuthorDate: Sun Sep 9 04:50:35 2018 +0900 Enable C++ AuthFactory to parse Athenz params string (#2540) --- pulsar-client-cpp/include/pulsar/Authentication.h | 2 + pulsar-client-cpp/lib/Authentication.cc | 43 +++-- pulsar-client-cpp/lib/auth/AuthTls.cc | 5 ++ pulsar-client-cpp/tests/AuthPluginTest.cc | 58 +++ 4 files changed, 94 insertions(+), 14 deletions(-) diff --git a/pulsar-client-cpp/include/pulsar/Authentication.h b/pulsar-client-cpp/include/pulsar/Authentication.h index bde4134..2ea1238 100644 --- a/pulsar-client-cpp/include/pulsar/Authentication.h +++ b/pulsar-client-cpp/include/pulsar/Authentication.h @@ -61,6 +61,7 @@ class Authentication { authDataContent = authData_; return ResultOk; } +static ParamMap parseDefaultFormatAuthParams(const std::string& authParamsString); protected: Authentication(); @@ -104,6 +105,7 @@ class AuthTls : public Authentication { AuthTls(AuthenticationDataPtr&); ~AuthTls(); static AuthenticationPtr create(ParamMap& params); +static AuthenticationPtr create(const std::string& authParamsString); static AuthenticationPtr create(const std::string& certificatePath, const std::string& privateKeyPath); const std::string getAuthMethodName() const; Result getAuthData(AuthenticationDataPtr& authDataTls) const; diff --git a/pulsar-client-cpp/lib/Authentication.cc b/pulsar-client-cpp/lib/Authentication.cc index c1025e1..b3ebf1c 100644 --- a/pulsar-client-cpp/lib/Authentication.cc +++ b/pulsar-client-cpp/lib/Authentication.cc @@ -60,6 +60,22 @@ Authentication::Authentication() {} Authentication::~Authentication() {} +ParamMap Authentication::parseDefaultFormatAuthParams(const std::string& authParamsString) { +ParamMap paramMap; +if (!authParamsString.empty()) { +std::vector params; +boost::algorithm::split(params, authParamsString, boost::is_any_of(",")); +for (int i = 0; i < params.size(); i++) { +std::vector kv; +boost::algorithm::split(kv, params[i], boost::is_any_of(":")); +if (kv.size() == 2) { +paramMap[kv[0]] = kv[1]; +} +} +} +return paramMap; +} + class AuthDisabledData : public AuthenticationDataProvider { public: AuthDisabledData(ParamMap& params) {} @@ -111,6 +127,17 @@ AuthenticationPtr tryCreateBuiltinAuth(const std::string& pluginName, ParamMap& } } +AuthenticationPtr tryCreateBuiltinAuth(const std::string& pluginName, const std::string& authParamsString) { +if (boost::iequals(pluginName, TLS_PLUGIN_NAME) || boost::iequals(pluginName, TLS_JAVA_PLUGIN_NAME)) { +return AuthTls::create(authParamsString); +} else if (boost::iequals(pluginName, ATHENZ_PLUGIN_NAME) || + boost::iequals(pluginName, ATHENZ_JAVA_PLUGIN_NAME)) { +return AuthAthenz::create(authParamsString); +} else { +return AuthenticationPtr(); +} +} + AuthenticationPtr AuthFactory::create(const std::string& pluginNameOrDynamicLibPath, const std::string& authParamsString) { { @@ -121,20 +148,7 @@ AuthenticationPtr AuthFactory::create(const std::string& pluginNameOrDynamicLibP } } -ParamMap paramMap; -if (!authParamsString.empty()) { -std::vector params; -boost::algorithm::split(params, authParamsString, boost::is_any_of(",")); -for (int i = 0; i < params.size(); i++) { -std::vector kv; -boost::algorithm::split(kv, params[i], boost::is_any_of(":")); -if (kv.size() == 2) { -paramMap[kv[0]] = kv[1]; -} -} -} - -AuthenticationPtr authPtr = tryCreateBuiltinAuth(pluginNameOrDynamicLibPath, paramMap); +AuthenticationPtr authPtr = tryCreateBuiltinAuth(pluginNameOrDynamicLibPath, authParamsString); if (authPtr) { return authPtr; } @@ -151,6 +165,7 @@ AuthenticationPtr AuthFactory::create(const std::string& pluginNameOrDynamicLibP if (createAuthentication != NULL) { auth = createAuthentication(authParamsString); } else { +ParamMap paramMap = Authentication::parseDefaultFormatAuthParams(authParamsString); return AuthFactory::create(pluginNameOrDynamicLibPath, paramMap); } } diff --git a/pulsar-client-cpp/lib/auth/AuthTls.cc b/pulsar-client-cpp/lib/auth/AuthTls.cc index f076aaf..fcf6571 100644 ---
[incubator-pulsar] branch master updated: Fix status code of REST API to get partitioned topic stats (#2542)
This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new cd185d1 Fix status code of REST API to get partitioned topic stats (#2542) cd185d1 is described below commit cd185d1d9bb3319cae3543c561454d40ac54d899 Author: massakam AuthorDate: Sun Sep 9 04:50:52 2018 +0900 Fix status code of REST API to get partitioned topic stats (#2542) --- .../apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 6 ++ .../java/org/apache/pulsar/broker/admin/AdminApiTest.java | 11 +++ 2 files changed, 17 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index f85c617..da51453 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -604,6 +604,12 @@ public class PersistentTopicsBase extends AdminResource { stats.add(partitionStats); stats.partitions.put(topicName.getPartition(i).toString(), partitionStats); } +} catch (PulsarAdminException e) { +if (e.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { +throw new RestException(Status.NOT_FOUND, "Internal topics have not been generated yet"); +} else { +throw new RestException(e); +} } catch (Exception e) { throw new RestException(e); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index ae72737..c7c090f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -53,6 +53,7 @@ import java.util.concurrent.TimeUnit; import javax.ws.rs.client.InvocationCallback; import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.Response.Status; import lombok.extern.slf4j.Slf4j; @@ -787,6 +788,16 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { assertEquals(admin.topics().getPartitionedTopicMetadata("persistent://prop-xyz/ns1/ds2").partitions, 0); +try { +admin.topics().getPartitionedStats(partitionedTopicName, false); +fail("should have failed"); +} catch (PulsarAdminException e) { +// ok +assertEquals(e.getStatusCode(), Status.NOT_FOUND.getStatusCode()); +} catch (Exception e) { +fail(e.getMessage()); +} + // create consumer and subscription URL pulsarUrl = new URL("http://127.0.0.1"; + ":" + BROKER_WEBSERVICE_PORT); PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).statsInterval(0, TimeUnit.SECONDS)