[GitHub] merlimat commented on a change in pull request #2543: Add ServiceUrlProvider and add method forceCloseConnection in PulsarC…

2018-09-08 Thread GitBox
merlimat commented on a change in pull request #2543: Add ServiceUrlProvider 
and add method forceCloseConnection in PulsarC…
URL: https://github.com/apache/incubator-pulsar/pull/2543#discussion_r216139475
 
 

 ##
 File path: 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 ##
 @@ -706,6 +706,58 @@ public void shutdown() throws PulsarClientException {
 }
 }
 
+@Override
+public void forceCloseConnection() {
+if (this.producers != null) {
 
 Review comment:
   There are few problems with this way of closing the connections. 
   
   `channelInactive()` is supposed to be called by Netty when the connection 
gets closed. With the current approache the original TCP connection will stay 
open and the broker will still have registered the producer/consumer as 
connected. 
   
   The right approach here should be to close all the connections in the 
`ConnectionPool`. That will be triggering the reconnections for all 
producers/consumers.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #2529: Added cmake install for lib and headers

2018-09-08 Thread GitBox
merlimat commented on issue #2529: Added cmake install for lib and headers
URL: https://github.com/apache/incubator-pulsar/pull/2529#issuecomment-419668830
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat commented on issue #2511: [documentation] Add section about licensing at contribution guide

2018-09-08 Thread GitBox
merlimat commented on issue #2511: [documentation] Add section about licensing 
at contribution guide
URL: https://github.com/apache/incubator-pulsar/pull/2511#issuecomment-419668698
 
 
   run java8 tests


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat closed pull request #2537: [schema] enable Schema.AUTO if functions or connectors are using GenericRecord

2018-09-08 Thread GitBox
merlimat closed pull request #2537: [schema] enable Schema.AUTO if functions or 
connectors are using GenericRecord
URL: https://github.com/apache/incubator-pulsar/pull/2537
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-functions/api-java/pom.xml 
b/pulsar-functions/api-java/pom.xml
index 9a8420608a..0f21bae460 100644
--- a/pulsar-functions/api-java/pom.xml
+++ b/pulsar-functions/api-java/pom.xml
@@ -37,6 +37,12 @@
   slf4j-api
 
 
+
+  ${project.groupId}
+  pulsar-client-schema
+  ${project.version}
+
+
 
   net.jodah
   typetools
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index e1059f390e..6eed8e0142 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -78,7 +78,7 @@ public void open(Map config, SourceContext 
sourceContext) throws
 inputConsumers = configs.entrySet().stream().map(e -> {
 String topic = e.getKey();
 ConsumerConfig conf = e.getValue();
-log.info("Creating consumers for topic : {}",  topic);
+log.info("Creating consumers for topic : {}, schema : {}",  topic, 
conf.getSchema());
 ConsumerBuilder cb = pulsarClient.newConsumer(conf.getSchema())
 // consume message even if can't decrypt and deliver it 
along with encryption-ctx
 .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index 2ac5b65764..76375dcc67 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -26,6 +26,7 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
@@ -68,24 +69,31 @@ public TopicSchema(PulsarClient client) {
 }
 
 public Schema getSchema(String topic, Class clazz, SchemaType 
schemaType) {
-return cachedSchemas.computeIfAbsent(topic, t -> extractSchema(clazz, 
schemaType));
+return cachedSchemas.computeIfAbsent(topic, t -> 
newSchemaInstance(clazz, schemaType));
 }
 
 /**
  * If the topic is already created, we should be able to fetch the schema 
type (avro, json, ...)
  */
 private SchemaType getSchemaTypeOrDefault(String topic, Class clazz) {
-Optional schema = ((PulsarClientImpl) 
client).getSchema(topic).join();
-if (schema.isPresent()) {
-return schema.get().getType();
+if (GenericRecord.class.isAssignableFrom(clazz)) {
+return SchemaType.AUTO;
 } else {
-return getDefaultSchemaType(clazz);
+Optional schema = ((PulsarClientImpl) 
client).getSchema(topic).join();
+if (schema.isPresent()) {
+return schema.get().getType();
+} else {
+return getDefaultSchemaType(clazz);
+}
 }
 }
 
 private static SchemaType getDefaultSchemaType(Class clazz) {
 if (byte[].class.equals(clazz)) {
 return SchemaType.NONE;
+} else if (GenericRecord.class.isAssignableFrom(clazz)) {
+// the function is taking generic record, so we do auto schema 
detection
+return SchemaType.AUTO;
 } else if (String.class.equals(clazz)) {
 // If type is String, then we use schema type string, otherwise we 
fallback on default schema
 return SchemaType.STRING;
@@ -102,6 +110,9 @@ private static SchemaType getDefaultSchemaType(Class 
clazz) {
 case NONE:
 return (Schema) Schema.BYTES;
 
+case AUTO:
+return (Schema) Schema.AUTO();
+
 case STRING:
 return (Schema) Schema.STRING;
 
@@ -165,27 +176,4 @@ private static boolean isProtobufClass(Class pojoClazz) 
{
 return new SerDeSchema<>(serDe);
 }
 }
-
-@SuppressWarnings("unchecked")
-private static  Schema extractSchema(Class clazz, SchemaType 
type) {
-   

[incubator-pulsar] branch master updated: [schema] enable Schema.AUTO if functions or connectors are using GenericRecord (#2537)

2018-09-08 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 9ec8a5e  [schema] enable Schema.AUTO if functions or connectors are 
using GenericRecord (#2537)
9ec8a5e is described below

commit 9ec8a5ee4d4a80c50986cde95093b1eb4af31ef0
Author: Sijie Guo 
AuthorDate: Sat Sep 8 12:51:12 2018 -0700

[schema] enable Schema.AUTO if functions or connectors are using 
GenericRecord (#2537)

* [schema] enable Schema.AUTO if functions or connectors are using 
GenericRecord

* add schema-api to functions-api
---
 pulsar-functions/api-java/pom.xml  |  6 ++
 .../pulsar/functions/source/PulsarSource.java  |  2 +-
 .../pulsar/functions/source/TopicSchema.java   | 44 +
 pulsar-functions/java-examples/pom.xml |  1 +
 .../functions/api/examples/AutoSchemaFunction.java | 33 ++
 tests/integration/pom.xml  | 12 
 .../integration/functions/PulsarFunctionsTest.java | 77 +-
 7 files changed, 143 insertions(+), 32 deletions(-)

diff --git a/pulsar-functions/api-java/pom.xml 
b/pulsar-functions/api-java/pom.xml
index 9a84206..0f21bae 100644
--- a/pulsar-functions/api-java/pom.xml
+++ b/pulsar-functions/api-java/pom.xml
@@ -38,6 +38,12 @@
 
 
 
+  ${project.groupId}
+  pulsar-client-schema
+  ${project.version}
+
+
+
   net.jodah
   typetools
   test
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index e1059f3..6eed8e0 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -78,7 +78,7 @@ public class PulsarSource extends PushSource implements 
MessageListener
 inputConsumers = configs.entrySet().stream().map(e -> {
 String topic = e.getKey();
 ConsumerConfig conf = e.getValue();
-log.info("Creating consumers for topic : {}",  topic);
+log.info("Creating consumers for topic : {}, schema : {}",  topic, 
conf.getSchema());
 ConsumerBuilder cb = pulsarClient.newConsumer(conf.getSchema())
 // consume message even if can't decrypt and deliver it 
along with encryption-ctx
 .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index 2ac5b65..76375dc 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -26,6 +26,7 @@ import java.util.Optional;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
@@ -68,24 +69,31 @@ public class TopicSchema {
 }
 
 public Schema getSchema(String topic, Class clazz, SchemaType 
schemaType) {
-return cachedSchemas.computeIfAbsent(topic, t -> extractSchema(clazz, 
schemaType));
+return cachedSchemas.computeIfAbsent(topic, t -> 
newSchemaInstance(clazz, schemaType));
 }
 
 /**
  * If the topic is already created, we should be able to fetch the schema 
type (avro, json, ...)
  */
 private SchemaType getSchemaTypeOrDefault(String topic, Class clazz) {
-Optional schema = ((PulsarClientImpl) 
client).getSchema(topic).join();
-if (schema.isPresent()) {
-return schema.get().getType();
+if (GenericRecord.class.isAssignableFrom(clazz)) {
+return SchemaType.AUTO;
 } else {
-return getDefaultSchemaType(clazz);
+Optional schema = ((PulsarClientImpl) 
client).getSchema(topic).join();
+if (schema.isPresent()) {
+return schema.get().getType();
+} else {
+return getDefaultSchemaType(clazz);
+}
 }
 }
 
 private static SchemaType getDefaultSchemaType(Class clazz) {
 if (byte[].class.equals(clazz)) {
 return SchemaType.NONE;
+} else if (GenericRecord.class.isAssignableFrom(clazz)) {
+// the function is taking generic record, so we do auto schema 
detect

[GitHub] merlimat closed pull request #2542: Fix status code of REST API to get partitioned topic stats

2018-09-08 Thread GitBox
merlimat closed pull request #2542: Fix status code of REST API to get 
partitioned topic stats
URL: https://github.com/apache/incubator-pulsar/pull/2542
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index f85c6175a7..da51453f8a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -604,6 +604,12 @@ protected PartitionedTopicStats 
internalGetPartitionedStats(boolean authoritativ
 stats.add(partitionStats);
 stats.partitions.put(topicName.getPartition(i).toString(), 
partitionStats);
 }
+} catch (PulsarAdminException e) {
+if (e.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
+throw new RestException(Status.NOT_FOUND, "Internal topics 
have not been generated yet");
+} else {
+throw new RestException(e);
+}
 } catch (Exception e) {
 throw new RestException(e);
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index ae72737083..c7c090fd44 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -53,6 +53,7 @@
 
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Response.Status;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -787,6 +788,16 @@ public void partitionedTopics(String topicName) throws 
Exception {
 
assertEquals(admin.topics().getPartitionedTopicMetadata("persistent://prop-xyz/ns1/ds2").partitions,
 0);
 
+try {
+admin.topics().getPartitionedStats(partitionedTopicName, false);
+fail("should have failed");
+} catch (PulsarAdminException e) {
+// ok
+assertEquals(e.getStatusCode(), Status.NOT_FOUND.getStatusCode());
+} catch (Exception e) {
+fail(e.getMessage());
+}
+
 // create consumer and subscription
 URL pulsarUrl = new URL("http://127.0.0.1"; + ":" + 
BROKER_WEBSERVICE_PORT);
 PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsarUrl.toString()).statsInterval(0, 
TimeUnit.SECONDS)


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] merlimat closed pull request #2540: Enable C++ AuthFactory to parse Athenz params string

2018-09-08 Thread GitBox
merlimat closed pull request #2540: Enable C++ AuthFactory to parse Athenz 
params string
URL: https://github.com/apache/incubator-pulsar/pull/2540
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/include/pulsar/Authentication.h 
b/pulsar-client-cpp/include/pulsar/Authentication.h
index bde413434f..2ea1238e2a 100644
--- a/pulsar-client-cpp/include/pulsar/Authentication.h
+++ b/pulsar-client-cpp/include/pulsar/Authentication.h
@@ -61,6 +61,7 @@ class Authentication {
 authDataContent = authData_;
 return ResultOk;
 }
+static ParamMap parseDefaultFormatAuthParams(const std::string& 
authParamsString);
 
protected:
 Authentication();
@@ -104,6 +105,7 @@ class AuthTls : public Authentication {
 AuthTls(AuthenticationDataPtr&);
 ~AuthTls();
 static AuthenticationPtr create(ParamMap& params);
+static AuthenticationPtr create(const std::string& authParamsString);
 static AuthenticationPtr create(const std::string& certificatePath, const 
std::string& privateKeyPath);
 const std::string getAuthMethodName() const;
 Result getAuthData(AuthenticationDataPtr& authDataTls) const;
diff --git a/pulsar-client-cpp/lib/Authentication.cc 
b/pulsar-client-cpp/lib/Authentication.cc
index c1025e161d..b3ebf1c633 100644
--- a/pulsar-client-cpp/lib/Authentication.cc
+++ b/pulsar-client-cpp/lib/Authentication.cc
@@ -60,6 +60,22 @@ Authentication::Authentication() {}
 
 Authentication::~Authentication() {}
 
+ParamMap Authentication::parseDefaultFormatAuthParams(const std::string& 
authParamsString) {
+ParamMap paramMap;
+if (!authParamsString.empty()) {
+std::vector params;
+boost::algorithm::split(params, authParamsString, 
boost::is_any_of(","));
+for (int i = 0; i < params.size(); i++) {
+std::vector kv;
+boost::algorithm::split(kv, params[i], boost::is_any_of(":"));
+if (kv.size() == 2) {
+paramMap[kv[0]] = kv[1];
+}
+}
+}
+return paramMap;
+}
+
 class AuthDisabledData : public AuthenticationDataProvider {
public:
 AuthDisabledData(ParamMap& params) {}
@@ -111,6 +127,17 @@ AuthenticationPtr tryCreateBuiltinAuth(const std::string& 
pluginName, ParamMap&
 }
 }
 
+AuthenticationPtr tryCreateBuiltinAuth(const std::string& pluginName, const 
std::string& authParamsString) {
+if (boost::iequals(pluginName, TLS_PLUGIN_NAME) || 
boost::iequals(pluginName, TLS_JAVA_PLUGIN_NAME)) {
+return AuthTls::create(authParamsString);
+} else if (boost::iequals(pluginName, ATHENZ_PLUGIN_NAME) ||
+   boost::iequals(pluginName, ATHENZ_JAVA_PLUGIN_NAME)) {
+return AuthAthenz::create(authParamsString);
+} else {
+return AuthenticationPtr();
+}
+}
+
 AuthenticationPtr AuthFactory::create(const std::string& 
pluginNameOrDynamicLibPath,
   const std::string& authParamsString) {
 {
@@ -121,20 +148,7 @@ AuthenticationPtr AuthFactory::create(const std::string& 
pluginNameOrDynamicLibP
 }
 }
 
-ParamMap paramMap;
-if (!authParamsString.empty()) {
-std::vector params;
-boost::algorithm::split(params, authParamsString, 
boost::is_any_of(","));
-for (int i = 0; i < params.size(); i++) {
-std::vector kv;
-boost::algorithm::split(kv, params[i], boost::is_any_of(":"));
-if (kv.size() == 2) {
-paramMap[kv[0]] = kv[1];
-}
-}
-}
-
-AuthenticationPtr authPtr = 
tryCreateBuiltinAuth(pluginNameOrDynamicLibPath, paramMap);
+AuthenticationPtr authPtr = 
tryCreateBuiltinAuth(pluginNameOrDynamicLibPath, authParamsString);
 if (authPtr) {
 return authPtr;
 }
@@ -151,6 +165,7 @@ AuthenticationPtr AuthFactory::create(const std::string& 
pluginNameOrDynamicLibP
 if (createAuthentication != NULL) {
 auth = createAuthentication(authParamsString);
 } else {
+ParamMap paramMap = 
Authentication::parseDefaultFormatAuthParams(authParamsString);
 return AuthFactory::create(pluginNameOrDynamicLibPath, paramMap);
 }
 }
diff --git a/pulsar-client-cpp/lib/auth/AuthTls.cc 
b/pulsar-client-cpp/lib/auth/AuthTls.cc
index f076aafb3b..fcf6571a25 100644
--- a/pulsar-client-cpp/lib/auth/AuthTls.cc
+++ b/pulsar-client-cpp/lib/auth/AuthTls.cc
@@ -36,6 +36,11 @@ AuthTls::AuthTls(AuthenticationDataPtr& authDataTls) { 
authDataTls_ = authDataTl
 
 AuthTls::~AuthTls() {}
 
+AuthenticationPtr AuthTls::create(const std::string& authParamsString) {
+ParamMap params = parseDefaultFormatAuthParams(authParamsString);
+return create(params);
+}
+
 AuthenticationPtr AuthTls:

[incubator-pulsar] branch master updated: Enable C++ AuthFactory to parse Athenz params string (#2540)

2018-09-08 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new 6451efd  Enable C++ AuthFactory to parse Athenz params string (#2540)
6451efd is described below

commit 6451efd6515026288ae8aa1b7118e97904eecf76
Author: massakam 
AuthorDate: Sun Sep 9 04:50:35 2018 +0900

Enable C++ AuthFactory to parse Athenz params string (#2540)
---
 pulsar-client-cpp/include/pulsar/Authentication.h |  2 +
 pulsar-client-cpp/lib/Authentication.cc   | 43 +++--
 pulsar-client-cpp/lib/auth/AuthTls.cc |  5 ++
 pulsar-client-cpp/tests/AuthPluginTest.cc | 58 +++
 4 files changed, 94 insertions(+), 14 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/Authentication.h 
b/pulsar-client-cpp/include/pulsar/Authentication.h
index bde4134..2ea1238 100644
--- a/pulsar-client-cpp/include/pulsar/Authentication.h
+++ b/pulsar-client-cpp/include/pulsar/Authentication.h
@@ -61,6 +61,7 @@ class Authentication {
 authDataContent = authData_;
 return ResultOk;
 }
+static ParamMap parseDefaultFormatAuthParams(const std::string& 
authParamsString);
 
protected:
 Authentication();
@@ -104,6 +105,7 @@ class AuthTls : public Authentication {
 AuthTls(AuthenticationDataPtr&);
 ~AuthTls();
 static AuthenticationPtr create(ParamMap& params);
+static AuthenticationPtr create(const std::string& authParamsString);
 static AuthenticationPtr create(const std::string& certificatePath, const 
std::string& privateKeyPath);
 const std::string getAuthMethodName() const;
 Result getAuthData(AuthenticationDataPtr& authDataTls) const;
diff --git a/pulsar-client-cpp/lib/Authentication.cc 
b/pulsar-client-cpp/lib/Authentication.cc
index c1025e1..b3ebf1c 100644
--- a/pulsar-client-cpp/lib/Authentication.cc
+++ b/pulsar-client-cpp/lib/Authentication.cc
@@ -60,6 +60,22 @@ Authentication::Authentication() {}
 
 Authentication::~Authentication() {}
 
+ParamMap Authentication::parseDefaultFormatAuthParams(const std::string& 
authParamsString) {
+ParamMap paramMap;
+if (!authParamsString.empty()) {
+std::vector params;
+boost::algorithm::split(params, authParamsString, 
boost::is_any_of(","));
+for (int i = 0; i < params.size(); i++) {
+std::vector kv;
+boost::algorithm::split(kv, params[i], boost::is_any_of(":"));
+if (kv.size() == 2) {
+paramMap[kv[0]] = kv[1];
+}
+}
+}
+return paramMap;
+}
+
 class AuthDisabledData : public AuthenticationDataProvider {
public:
 AuthDisabledData(ParamMap& params) {}
@@ -111,6 +127,17 @@ AuthenticationPtr tryCreateBuiltinAuth(const std::string& 
pluginName, ParamMap&
 }
 }
 
+AuthenticationPtr tryCreateBuiltinAuth(const std::string& pluginName, const 
std::string& authParamsString) {
+if (boost::iequals(pluginName, TLS_PLUGIN_NAME) || 
boost::iequals(pluginName, TLS_JAVA_PLUGIN_NAME)) {
+return AuthTls::create(authParamsString);
+} else if (boost::iequals(pluginName, ATHENZ_PLUGIN_NAME) ||
+   boost::iequals(pluginName, ATHENZ_JAVA_PLUGIN_NAME)) {
+return AuthAthenz::create(authParamsString);
+} else {
+return AuthenticationPtr();
+}
+}
+
 AuthenticationPtr AuthFactory::create(const std::string& 
pluginNameOrDynamicLibPath,
   const std::string& authParamsString) {
 {
@@ -121,20 +148,7 @@ AuthenticationPtr AuthFactory::create(const std::string& 
pluginNameOrDynamicLibP
 }
 }
 
-ParamMap paramMap;
-if (!authParamsString.empty()) {
-std::vector params;
-boost::algorithm::split(params, authParamsString, 
boost::is_any_of(","));
-for (int i = 0; i < params.size(); i++) {
-std::vector kv;
-boost::algorithm::split(kv, params[i], boost::is_any_of(":"));
-if (kv.size() == 2) {
-paramMap[kv[0]] = kv[1];
-}
-}
-}
-
-AuthenticationPtr authPtr = 
tryCreateBuiltinAuth(pluginNameOrDynamicLibPath, paramMap);
+AuthenticationPtr authPtr = 
tryCreateBuiltinAuth(pluginNameOrDynamicLibPath, authParamsString);
 if (authPtr) {
 return authPtr;
 }
@@ -151,6 +165,7 @@ AuthenticationPtr AuthFactory::create(const std::string& 
pluginNameOrDynamicLibP
 if (createAuthentication != NULL) {
 auth = createAuthentication(authParamsString);
 } else {
+ParamMap paramMap = 
Authentication::parseDefaultFormatAuthParams(authParamsString);
 return AuthFactory::create(pluginNameOrDynamicLibPath, paramMap);
 }
 }
diff --git a/pulsar-client-cpp/lib/auth/AuthTls.cc 
b/pulsar-client-cpp/lib/auth/AuthTls.cc
index f076aaf..fcf6571 100644
--- 

[incubator-pulsar] branch master updated: Fix status code of REST API to get partitioned topic stats (#2542)

2018-09-08 Thread mmerli
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
 new cd185d1  Fix status code of REST API to get partitioned topic stats 
(#2542)
cd185d1 is described below

commit cd185d1d9bb3319cae3543c561454d40ac54d899
Author: massakam 
AuthorDate: Sun Sep 9 04:50:52 2018 +0900

Fix status code of REST API to get partitioned topic stats (#2542)
---
 .../apache/pulsar/broker/admin/impl/PersistentTopicsBase.java |  6 ++
 .../java/org/apache/pulsar/broker/admin/AdminApiTest.java | 11 +++
 2 files changed, 17 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index f85c617..da51453 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -604,6 +604,12 @@ public class PersistentTopicsBase extends AdminResource {
 stats.add(partitionStats);
 stats.partitions.put(topicName.getPartition(i).toString(), 
partitionStats);
 }
+} catch (PulsarAdminException e) {
+if (e.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
+throw new RestException(Status.NOT_FOUND, "Internal topics 
have not been generated yet");
+} else {
+throw new RestException(e);
+}
 } catch (Exception e) {
 throw new RestException(e);
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index ae72737..c7c090f 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -53,6 +53,7 @@ import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.client.InvocationCallback;
 import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Response.Status;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -787,6 +788,16 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
 
assertEquals(admin.topics().getPartitionedTopicMetadata("persistent://prop-xyz/ns1/ds2").partitions,
 0);
 
+try {
+admin.topics().getPartitionedStats(partitionedTopicName, false);
+fail("should have failed");
+} catch (PulsarAdminException e) {
+// ok
+assertEquals(e.getStatusCode(), Status.NOT_FOUND.getStatusCode());
+} catch (Exception e) {
+fail(e.getMessage());
+}
+
 // create consumer and subscription
 URL pulsarUrl = new URL("http://127.0.0.1"; + ":" + 
BROKER_WEBSERVICE_PORT);
 PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsarUrl.toString()).statsInterval(0, 
TimeUnit.SECONDS)